Merge "Bug 4957 Cleaning code and fixing issues from manual tests"
authormichal rehak <mirehak@cisco.com>
Wed, 2 Mar 2016 16:46:59 +0000 (16:46 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 2 Mar 2016 16:46:59 +0000 (16:46 +0000)
13 files changed:
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/role/RoleContext.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/role/RoleManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalRoleServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/MdSalRegistratorUtils.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImplTest.java

index 71cc8d60293623956b1c8bc7852822a46b00852f..ab95af91dd1bd728531e83dce9fd25eb6ebfacbc 100644 (file)
@@ -17,8 +17,6 @@ import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
  */
 public interface RoleContext extends RoleChangeListener, RequestContextStack {
 
-    void setTxLockOwned(boolean txLockOwned);
-
     /**
      * Initialization method is responsible for a registration of
      * {@link org.opendaylight.controller.md.sal.common.api.clustering.Entity} and listen for notification from service.
@@ -47,5 +45,4 @@ public interface RoleContext extends RoleChangeListener, RequestContextStack {
 
     DeviceContext getDeviceContext();
 
-    boolean isTxLockOwned();
 }
index 70de1190aa69736d31f1456ab2ba85dfea826cd7..f5ad89c479a7e0fb597ce8762673f0d04615293f 100644 (file)
@@ -36,7 +36,6 @@ import org.opendaylight.controller.md.sal.binding.api.NotificationService;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
@@ -280,6 +279,11 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
             LOG.warn(errMsg);
             return Futures.immediateFailedFuture(new IllegalStateException(errMsg));
         }
+        if (rpcContext == null) {
+            final String errMsg = String.format("DeviceCtx {} is up but we are missing RpcContext", deviceState.getNodeId());
+            LOG.warn(errMsg);
+            return Futures.immediateFailedFuture(new IllegalStateException(errMsg));
+        }
 
         final InstanceIdentifier<FlowCapableNode> ofNodeII = deviceState.getNodeInstanceIdentifier()
                 .augmentation(FlowCapableNode.class);
@@ -291,7 +295,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
                 new AsyncFunction<Optional<FlowCapableNode>, Void>() {
                     @Override
                     public ListenableFuture<Void> apply(final Optional<FlowCapableNode> input) throws Exception {
-                        if (!input.isPresent() || input.get().getTable() != null || input.get().getTable().isEmpty()) {
+                        if (!input.isPresent() || input.get().getTable() == null || input.get().getTable().isEmpty()) {
                             /* Last master close fail scenario so we would like to activate TxManager */
                             LOG.debug("Operational DS for Device {} has to be replaced", deviceState.getNodeId());
                             getDeviceState().setDeviceSynchronized(false);
@@ -318,22 +322,25 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
 
             @Override
             public Void apply(final Boolean input) {
+                if (ConnectionContext.CONNECTION_STATE.RIP.equals(getPrimaryConnectionContext().getConnectionState())) {
+                    final String errMsg = String.format("We lost connection for Device {}, context has to be closed.",
+                            getDeviceState().getNodeId());
+                    LOG.warn(errMsg);
+                    transactionChainManager.clearUnsubmittedTransaction();
+                    throw new IllegalStateException(errMsg);
+                }
                 if (!input.booleanValue()) {
-                    LOG.warn("Get Initial Device {} information fails", getDeviceState().getNodeId());
-                    DeviceContextImpl.this.close();
-                    return null;
+                    final String errMsg = String.format("Get Initial Device {} information fails",
+                            getDeviceState().getNodeId());
+                    LOG.warn(errMsg);
+                    transactionChainManager.clearUnsubmittedTransaction();
+                    throw new IllegalStateException(errMsg);
                 }
                 LOG.debug("Get Initial Device {} information is successful", getDeviceState().getNodeId());
                 getDeviceState().setDeviceSynchronized(true);
                 transactionChainManager.activateTransactionManager();
-                //TODO: This is relevant for slave to master scenario make verify
-                if (null != rpcContext) {
-                    MdSalRegistratorUtils.registerMasterServices(getRpcContext(), DeviceContextImpl.this, role);
-                    getRpcContext().registerStatCompatibilityServices();
-                } else {
-                    LOG.warn("No RpcCtx on deviceCtx: {}, cannot register services", this);
-                    // TODO : can we stay without RPCs or we need to call DeviceCtx.close ?
-                }
+                MdSalRegistratorUtils.registerMasterServices(getRpcContext(), DeviceContextImpl.this, role);
+                getRpcContext().registerStatCompatibilityServices();
                 initialSubmitTransaction();
                 getDeviceState().setStatisticsPollingEnabledProp(true);
                 return null;
@@ -559,7 +566,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
             deviceFlowRegistry.close();
             deviceMeterRegistry.close();
 
-            final CheckedFuture<Void, TransactionCommitFailedException> future = transactionChainManager.shuttingDown();
+            final ListenableFuture<Void> future = transactionChainManager.shuttingDown();
             Futures.addCallback(future, new FutureCallback<Void>() {
 
                 @Override
index a614509d8f769a67c7dbe03fe3541edef6a108f1..c13d9e09adbd0991704f6842d0ff4acd0a62b848 100644 (file)
@@ -9,8 +9,11 @@ package org.opendaylight.openflowplugin.impl.device;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
+import com.google.common.collect.Iterators;
 import io.netty.util.HashedWheelTimer;
 import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -102,24 +105,12 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
     }
 
     @Override
-    public void onDeviceContextLevelUp(final DeviceContext deviceContext) {
+    public void onDeviceContextLevelUp(final DeviceContext deviceContext) throws Exception {
         // final phase - we have to add new Device to MD-SAL DataStore
         LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceContext.getDeviceState().getNodeId());
         Preconditions.checkNotNull(deviceContext);
-        try {
-            ((DeviceContextImpl) deviceContext).initialSubmitTransaction();
-            deviceContext.onPublished();
-
-        } catch (final Exception e) {
-            LOG.warn("Node {} can not be add to OPERATIONAL DataStore yet because {} ", deviceContext.getDeviceState().getNodeId(), e.getMessage());
-            LOG.trace("Problem with add node {} to OPERATIONAL DataStore", deviceContext.getDeviceState().getNodeId(), e);
-            try {
-                deviceContext.close();
-            } catch (final Exception e1) {
-                LOG.warn("Exception on device context close. ", e);
-            }
-        }
-
+        ((DeviceContextImpl) deviceContext).initialSubmitTransaction();
+        deviceContext.onPublished();
     }
 
     @Override
@@ -212,9 +203,10 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi
     }
 
     @Override
-    public void close() throws Exception {
-        for (final DeviceContext deviceContext : deviceContexts.values()) {
-            deviceContext.close();
+    public void close() {
+        for (final Iterator<Entry<NodeId, DeviceContext>> iterator = Iterators
+                .consumingIterator(deviceContexts.entrySet().iterator()); iterator.hasNext();) {
+            iterator.next().getValue().close();
         }
     }
 
index 23a7485982ac99742f20ba184c1dce0e4f184255..36685c5182bdd61667e65a1c71601b0861cede20 100644 (file)
@@ -9,7 +9,10 @@
 package org.opendaylight.openflowplugin.impl.device;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.FutureFallback;
@@ -20,13 +23,17 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
@@ -119,8 +126,8 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
      * Call this method for SLAVE only.
      * @return Future
      */
-    public CheckedFuture<Void, TransactionCommitFailedException> deactivateTransactionManager() {
-        final CheckedFuture<Void, TransactionCommitFailedException> future;
+    public ListenableFuture<Void> deactivateTransactionManager() {
+        final ListenableFuture<Void> future;
         synchronized (txLock) {
             if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
                 LOG.debug("Submitting all transactions if we were in status WORKING for Node", deviceState.getNodeId());
@@ -242,9 +249,9 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         submitIsEnabled = true;
     }
 
-    CheckedFuture<Void, TransactionCommitFailedException> shuttingDown() {
+    ListenableFuture<Void> shuttingDown() {
         LOG.debug("TxManager is going SUTTING_DOWN for node {}", nodeII);
-        CheckedFuture<Void, TransactionCommitFailedException> future;
+        ListenableFuture<Void> future;
         synchronized (txLock) {
             this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
             future = txChainShuttingDown();
@@ -252,8 +259,8 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         return future;
     }
 
-    private CheckedFuture<Void, TransactionCommitFailedException> txChainShuttingDown() {
-        CheckedFuture<Void, TransactionCommitFailedException> future;
+    private ListenableFuture<Void> txChainShuttingDown() {
+        ListenableFuture<Void> future;
         if (txChainFactory == null) {
             // stay with actual thread
             future = Futures.immediateCheckedFuture(null);
@@ -266,19 +273,56 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
             wTx.merge(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
             future = wTx.submit();
             wTx = null;
-            Futures.withFallback(future, new FutureFallback<Void>() {
+
+            future = Futures.withFallback(future, new FutureFallback<Void>() {
 
                 @Override
                 public ListenableFuture<Void> create(final Throwable t) throws Exception {
-                    final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
-                    delWtx.put(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
-                    return delWtx.submit();
+                    LOG.debug("Last ShuttingDown Transaction for node {} fail. Put empty FlowCapableNode",
+                            deviceState.getNodeId());
+                    final ReadOnlyTransaction readWriteTx = dataBroker.newReadOnlyTransaction();
+                    final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> readFlowNode = readWriteTx
+                            .read(LogicalDatastoreType.OPERATIONAL, nodeII.augmentation(FlowCapableNode.class));
+                    return Futures.transform(readFlowNode, new AsyncFunction<Optional<FlowCapableNode>, Void>() {
+
+                        @Override
+                        public ListenableFuture<Void> apply(final Optional<FlowCapableNode> input) {
+                            if (input.isPresent()) {
+                                final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
+                                nodeBuilder.addAugmentation(FlowCapableNode.class, new FlowCapableNodeBuilder().build());
+                                delWtx.put(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
+                                return delWtx.submit();
+                            }
+                            return Futures.immediateFuture(null);
+                        }
+                    });
                 }
             });
         }
         return future;
     }
 
+    /**
+     * Transaction could be close if we are not submit anything. We have property submitIsEnable what
+     * could protect us for check it is NEW transaction from chain and we are able close everything
+     * safely.
+     */
+    void clearUnsubmittedTransaction() {
+        LOG.debug("Cleaning unsubmited Transaction for Device {}", deviceState.getNodeId());
+        Verify.verify(!submitIsEnabled, "We are not able clean TxChain {}", deviceState.getNodeId());
+        synchronized (txLock) {
+            if (wTx != null) {
+                wTx.cancel();
+                wTx = null;
+            }
+            if (txChainFactory != null) {
+                txChainFactory.close();
+                txChainFactory = null;
+            }
+            transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
+        }
+    }
+
     @Override
     public void close() {
         LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN, will wait for ownershipservice to notify", nodeII);
index 78eb72402d30a2b08a8489be9f0e2d0e0290e9b9..d2e8fb80e056b8d9e125369bb66496e264729ebe 100644 (file)
@@ -64,7 +64,6 @@ public class RoleContextImpl implements RoleContext {
 
     private final Semaphore mainCandidateGuard = new Semaphore(1, true);
     private final Semaphore txCandidateGuard = new Semaphore(1, true);
-    private volatile boolean txLockOwned;
 
     public RoleContextImpl(final DeviceContext deviceContext, final EntityOwnershipService entityOwnershipService,
                            final Entity entity, final Entity txEnitity) {
@@ -210,16 +209,6 @@ public class RoleContextImpl implements RoleContext {
         return txCandidateGuard;
     }
 
-    @Override
-    public boolean isTxLockOwned() {
-        return txLockOwned;
-    }
-
-    @Override
-    public void setTxLockOwned(final boolean txLockOwned) {
-        this.txLockOwned = txLockOwned;
-    }
-
     private ListenableFuture<Void> sendRoleChangeToDevice(final OfpRole newRole, final AsyncFunction<RpcResult<SetRoleOutput>, Void> function) {
         LOG.debug("Send new Role {} to Device {}", newRole, deviceContext.getDeviceState().getNodeId());
         final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
index 57683dafd14379cea00b34980a09412c6b2210d4..b365c797b7eadb1b298bd5e951bd82c02c04e9f9 100644 (file)
@@ -10,12 +10,15 @@ package org.opendaylight.openflowplugin.impl.role;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.base.Verify;
+import com.google.common.collect.Iterators;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import java.util.Map;
+import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Semaphore;
@@ -93,46 +96,29 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
         deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
     }
 
-    void getRoleContextLevelUp(final DeviceContext deviceContext) {
-        LOG.debug("Created role context for node {}", deviceContext.getDeviceState().getNodeId());
-        LOG.debug("roleChangeFuture success for device:{}. Moving to StatisticsManager", deviceContext.getDeviceState().getNodeId());
-        try {
-            deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceContext);
-        } catch (final Exception e) {
-            LOG.info("failed to complete levelUp on next handler for device {}",
-                    deviceContext.getDeviceState().getNodeId());
-            deviceContext.close();
-            return;
-        }
-    }
-
     @Override
-    public void close() throws Exception {
+    public void close() {
         entityOwnershipListenerRegistration.close();
         txEntityOwnershipListenerRegistration.close();
-        for (final Map.Entry<Entity, RoleContext> roleContextEntry : contexts.entrySet()) {
+        for (final Iterator<Entry<Entity, RoleContext>> iterator = Iterators.consumingIterator(contexts.entrySet()
+                .iterator()); iterator.hasNext();) {
             // got here because last known role is LEADER and DS might need clearing up
-            final Entity entity = roleContextEntry.getKey();
-            final Optional<EntityOwnershipState> ownershipState = entityOwnershipService.getOwnershipState(entity);
-            final NodeId nodeId = roleContextEntry.getValue().getDeviceState().getNodeId();
-            if (ownershipState.isPresent()) {
-                if ((!ownershipState.get().hasOwner())) {
-                    LOG.trace("Last role is LEADER and ownershipService returned hasOwner=false for node: {}; " +
-                            "cleaning DS as being probably the last owner", nodeId);
-                    removeDeviceFromOperDS(roleContextEntry.getValue());
-                } else {
-                    // NOOP - there is another owner
-                    LOG.debug("Last role is LEADER and ownershipService returned hasOwner=true for node: {}; " +
-                            "leaving DS untouched", nodeId);
-                }
+            final Entry<Entity, RoleContext> entry = iterator.next();
+            final RoleContext roleCtx = entry.getValue();
+            final NodeId nodeId = roleCtx.getDeviceState().getNodeId();
+            if (OfpRole.BECOMEMASTER.equals(roleCtx.getDeviceState().getRole())) {
+                LOG.trace("Last role is LEADER and ownershipService returned hasOwner=false for node: {}; "
+                        + "cleaning DS as being probably the last owner", nodeId);
+                removeDeviceFromOperDS(roleCtx);
             } else {
-                // TODO: is this safe? When could this happen?
-                LOG.warn("Last role is LEADER but ownershipService returned empty ownership info for node: {}; " +
-                        "cleaning DS ANYWAY!", nodeId);
-                removeDeviceFromOperDS(roleContextEntry.getValue());
+                // NOOP - there is another owner
+                LOG.debug("Last role is LEADER and ownershipService returned hasOwner=true for node: {}; "
+                        + "leaving DS untouched", nodeId);
             }
+            roleCtx.suspendTxCandidate();
+            txContexts.remove(roleCtx.getTxEntity(), roleCtx);
+            roleCtx.close();
         }
-        contexts.clear();
     }
 
     @Override
@@ -179,25 +165,23 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
     @Override
     public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
         Preconditions.checkArgument(ownershipChange != null);
-        RoleContext roleCtxForClose = null;
+        RoleContext roleContext = null;
         try {
-            final RoleContext roleContext = contexts.get(ownershipChange.getEntity());
+            roleContext = contexts.get(ownershipChange.getEntity());
             if (roleContext != null) {
-                roleCtxForClose = roleContext;
                 changeForEntity(ownershipChange, roleContext);
                 return;
             }
 
-            final RoleContext txRoleContext = txContexts.get(ownershipChange.getEntity());
-            if (txRoleContext != null) {
-                roleCtxForClose = txRoleContext;
-                changeForTxEntity(ownershipChange, txRoleContext);
+            roleContext = txContexts.get(ownershipChange.getEntity());
+            if (roleContext != null) {
+                changeForTxEntity(ownershipChange, roleContext);
                 return;
             }
-        } catch (final InterruptedException e) {
-            LOG.warn("fail to acquire semaphore: {}", ownershipChange.getEntity());
-            if (roleCtxForClose != null) {
-                roleCtxForClose.close();
+        } catch (final Exception e) {
+            LOG.warn("fail to acquire semaphore: {}", ownershipChange.getEntity(), e);
+            if (roleContext != null) {
+                roleContext.getDeviceContext().close();
             }
         }
 
@@ -219,11 +203,6 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
         if (!ownershipChange.wasOwner() && ownershipChange.isOwner()) {
             // SLAVE -> MASTER - acquired transition lock
             LOG.debug("Acquired tx-lock for entity {}", ownershipChange.getEntity());
-            roleContext.setTxLockOwned(true);
-            final OfpRole role = roleContext.getDeviceState().getRole();
-            /* SLAVE to MASTER scenario has wait for TxEntity LEADER before sending ROLE to Device */
-            Verify.verify(OfpRole.BECOMESLAVE.equals(role),
-                    "Acquired tx-lock but current role = {}", role);
 
             // activate txChainManager, activate rpcs
             if (roleContext.getDeviceState().isValid()) {
@@ -245,7 +224,6 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
         } else if (ownershipChange.wasOwner() && !ownershipChange.isOwner()) {
             // MASTER -> SLAVE - released tx-lock
             LOG.debug("Released tx-lock for entity {}", ownershipChange.getEntity());
-            roleContext.setTxLockOwned(false);
             txContexts.remove(roleContext.getTxEntity(), roleContext);
             processingClosure = Futures.immediateFuture(null);
         } else {
@@ -266,8 +244,7 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
 
                     @Override
                     public void onFailure(final Throwable throwable) {
-                        LOG.warn("Unexpected error for Node {}, txLock={} -> terminating device context", nodeId,
-                                roleContext.isTxLockOwned(), throwable);
+                        LOG.warn("Unexpected error for Node {} -> terminating device context", nodeId, throwable);
                         txCandidateGuard.release();
                         deviceContext.close();
                     }
@@ -303,7 +280,7 @@ public class RoleManagerImpl implements RoleManager, EntityOwnershipListener {
                     // withdraw context from map in order to have it as before
                     txContexts.remove(roleContext.getTxEntity(), roleContext);
                     // no more propagating any role - there is no txCandidate lock approaching
-                    roleContext.getDeviceContext().close();
+                    Throwables.propagate(e);
                 }
                 return null;
             }
index 6299cacf6607d130053a8c6b4db488d36b14632d..ba7d36f4abf255960d5624be8a22117429a1126a 100644 (file)
@@ -7,6 +7,10 @@
  */
 package org.opendaylight.openflowplugin.impl.rpc;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Semaphore;
@@ -38,7 +42,7 @@ public class RpcContextImpl implements RpcContext {
     private final NotificationPublishService notificationPublishService;
 
     public RpcContextImpl(final MessageSpy messageSpy, final RpcProviderRegistry rpcProviderRegistry, final DeviceContext deviceContext,
- final int maxRequests, final boolean isStatisticsRpcEnabled,
           final int maxRequests, final boolean isStatisticsRpcEnabled,
             final NotificationPublishService notificationPublishService) {
         this.deviceContext = Preconditions.checkNotNull(deviceContext);
         this.messageSpy = Preconditions.checkNotNull(messageSpy);
@@ -46,6 +50,7 @@ public class RpcContextImpl implements RpcContext {
         this.isStatisticsRpcEnabled = isStatisticsRpcEnabled;
         this.notificationPublishService = notificationPublishService;
         tracker = new Semaphore(maxRequests, true);
+        deviceContext.setRpcContext(RpcContextImpl.this);
     }
 
     /**
@@ -64,6 +69,7 @@ public class RpcContextImpl implements RpcContext {
         }
     }
 
+    @Override
     public void registerStatCompatibilityServices() {
         if (isStatisticsRpcEnabled) {
             MdSalRegistratorUtils.registerStatCompatibilityServices(RpcContextImpl.this, deviceContext,
@@ -83,13 +89,14 @@ public class RpcContextImpl implements RpcContext {
      */
     @Override
     public void close() {
-        for (final RoutedRpcRegistration<?> rpcRegistration : rpcRegistrations.values()) {
+        for (final Iterator<Entry<Class<?>, RoutedRpcRegistration<?>>> iterator = Iterators
+                .consumingIterator(rpcRegistrations.entrySet().iterator()); iterator.hasNext();) {
+            final RoutedRpcRegistration<?> rpcRegistration = iterator.next().getValue();
             rpcRegistration.unregisterPath(NodeContext.class, deviceContext.getDeviceState().getNodeInstanceIdentifier());
             rpcRegistration.close();
             LOG.debug("Closing RPC Registration of service {} for device {}.", rpcRegistration.getServiceType(),
                     deviceContext.getDeviceState().getNodeInstanceIdentifier());
         }
-        rpcRegistrations.clear();
     }
 
     @Override
@@ -101,12 +108,19 @@ public class RpcContextImpl implements RpcContext {
             LOG.info("Acquired semaphore for {}, available permits:{} ", deviceContext.getDeviceState().getNodeId(), tracker.availablePermits());
         }
 
-        return new AbstractRequestContext<T>(deviceContext.reservedXidForDeviceMessage()) {
+        final Long xid = deviceContext.reservedXidForDeviceMessage();
+        if (xid == null) {
+            LOG.warn("Xid cannot be reserved for new RequestContext, node:{}", deviceContext.getDeviceState().getNodeId());
+            tracker.release();
+            return null;
+        }
+
+        return new AbstractRequestContext<T>(xid) {
             @Override
             public void close() {
                 tracker.release();
                 final long xid = getXid().getValue();
-                LOG.info("Removed request context with xid {}", xid);
+                LOG.trace("Removed request context with xid {}", xid);
                 messageSpy.spyMessage(RpcContextImpl.class, MessageSpy.STATISTIC_GROUP.REQUEST_STACK_FREED);
             }
         };
index f95d5ff809878e0136f20b207ff04be397fd34ae..523ccc5aeed58c17d063bbe9034b358b7faf086f 100644 (file)
@@ -8,6 +8,9 @@
 package org.opendaylight.openflowplugin.impl.rpc;
 
 import com.google.common.base.Verify;
+import com.google.common.collect.Iterators;
+import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
@@ -55,8 +58,6 @@ public class RpcManagerImpl implements RpcManager {
         Verify.verify(contexts.putIfAbsent(deviceContext, rpcContext) == null, "RpcCtx still not closed for node {}", nodeId);
         deviceContext.addDeviceContextClosedHandler(this);
 
-        //FIXME : propagate isStatisticsRpcEnabled to DeviceContext
-
         if (OfpRole.BECOMEMASTER.equals(ofpRole)) {
             LOG.info("Registering Openflow RPCs for node:{}, role:{}", nodeId, ofpRole);
             MdSalRegistratorUtils.registerMasterServices(rpcContext, deviceContext, ofpRole);
@@ -76,11 +77,11 @@ public class RpcManagerImpl implements RpcManager {
     }
 
     @Override
-    public void close() throws Exception {
-        for(final RpcContext ctx : contexts.values()) {
-            ctx.close();
+    public void close() {
+        for (final Iterator<Entry<DeviceContext, RpcContext>> iterator = Iterators
+                .consumingIterator(contexts.entrySet().iterator()); iterator.hasNext();) {
+            iterator.next().getValue().close();
         }
-        contexts.clear();
     }
 
 
index 866ab8fe290d53899db084e0d8619b1d03edb1b4..6731772da894058acfc51f21cb23a9e59e3e040e 100644 (file)
@@ -75,10 +75,10 @@ public final class SalRoleServiceImpl extends AbstractSimpleService<SetRoleInput
             LOG.trace("currentRole lock queue: " + currentRoleGuard.getQueueLength());
         } catch (final InterruptedException e) {
             LOG.warn("Unexpected exception for acquire semaphor for input {}", input);
-            return RpcResultBuilder.<SetRoleOutput> success().buildFuture();
+            return RpcResultBuilder.<SetRoleOutput> failed().buildFuture();
         }
         // compare with last known role and set if different. If they are same, then return.
-        if (currentRole == input.getControllerRole()) {
+        if (currentRole.equals(input.getControllerRole())) {
             LOG.info("Role to be set is same as the last known role for the device:{}. Hence ignoring.",
                     input.getControllerRole());
             currentRoleGuard.release();
@@ -88,13 +88,34 @@ public final class SalRoleServiceImpl extends AbstractSimpleService<SetRoleInput
         final SettableFuture<RpcResult<SetRoleOutput>> resultFuture = SettableFuture
                 .<RpcResult<SetRoleOutput>> create();
         repeaterForChangeRole(resultFuture, input, 0);
+        /* Add Callback for release Guard */
+        Futures.addCallback(resultFuture, new FutureCallback<RpcResult<SetRoleOutput>>() {
+
+            @Override
+            public void onSuccess(final RpcResult<SetRoleOutput> result) {
+                LOG.debug("SetRoleService for Node: {} is ok Role: {}", input.getNode().getValue(),
+                        input.getControllerRole());
+                currentRoleGuard.release();
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                LOG.warn("SetRoleService set Role {} for Node: {} fail.", input.getControllerRole(),
+                        input.getNode().getValue(), t);
+                currentRoleGuard.release();
+            }
+        });
         return resultFuture;
     }
 
     private void repeaterForChangeRole(final SettableFuture<RpcResult<SetRoleOutput>> future, final SetRoleInput input,
             final int retryCounter) {
+        if (future.isCancelled()) {
+            future.setException(new RoleChangeException(String.format(
+                    "Set Role for device %s stop because Future was canceled", input.getNode().getValue())));
+            return;
+        }
         if (retryCounter >= MAX_RETRIES) {
-            currentRoleGuard.release();
             future.setException(new RoleChangeException(String.format("Set Role failed after %s tries on device %s",
                     MAX_RETRIES, input.getNode().getValue())));
             return;
@@ -104,7 +125,6 @@ public final class SalRoleServiceImpl extends AbstractSimpleService<SetRoleInput
         switch (state) {
         case RIP:
             LOG.info("Device {} has been disconnected", input.getNode());
-            currentRoleGuard.release();
             future.setException(new Exception(String.format(
                     "Device connection doesn't exist anymore. Primary connection status : %s", state)));
             return;
@@ -113,7 +133,6 @@ public final class SalRoleServiceImpl extends AbstractSimpleService<SetRoleInput
             break;
         default:
             LOG.warn("Device {} is in state {}, role change is not allowed", input.getNode(), state);
-            currentRoleGuard.release();
             future.setException(new Exception(String.format("Unexcpected device connection status : %s", state)));
             return;
         }
@@ -126,7 +145,6 @@ public final class SalRoleServiceImpl extends AbstractSimpleService<SetRoleInput
             public void onSuccess(final SetRoleOutput result) {
                 LOG.info("setRoleOutput received after roleChangeTask execution:{}", result);
                 currentRole = input.getControllerRole();
-                currentRoleGuard.release();
                 future.set(RpcResultBuilder.<SetRoleOutput> success().withResult(result).build());
             }
 
index 65c7057ceeb0e18337850277527cd80abe8bc036..da90f9844c35a3072be31c2d7524fae2e495877e 100644 (file)
@@ -179,6 +179,15 @@ public class StatisticsContextImpl implements StatisticsContext {
         }
 
         void statChainFuture ( final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture){
+
+            if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
+                final String errMsg = String.format("Device connection is closed for Node : %s.",
+                        deviceContext.getDeviceState().getNodeId());
+                LOG.debug(errMsg);
+                resultFuture.setException(new IllegalStateException(errMsg));
+                return;
+            }
+
             if (!iterator.hasNext()) {
                 resultFuture.set(Boolean.TRUE);
                 LOG.debug("Stats collection successfully finished for node {}", deviceContext.getDeviceState().getNodeId());
index bc2b4a33de60b1a8ac48b40ce21ecd618f6ac8aa..3cb77d0bddad4fb3a6ac6fda6c59d5511791d142 100644 (file)
@@ -21,13 +21,16 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
+import com.google.common.collect.Iterators;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
@@ -112,8 +115,8 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
             }
             scheduleNextPolling(deviceContext, statisticsContext, new TimeCounter());
         }
-        deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
         deviceContext.getDeviceState().setDeviceSynchronized(true);
+        deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
     }
 
     private void initialStatPollForMaster(final StatisticsContext statisticsContext, final DeviceContext deviceContext) {
@@ -130,7 +133,7 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
                     LOG.trace("Device dynamic info collecting done. Going to announce raise to next level.");
                     try {
                         deviceInitPhaseHandler.onDeviceContextLevelUp(deviceContext);
-                    } catch (Exception e) {
+                    } catch (final Exception e) {
                         LOG.info("failed to complete levelUp on next handler for device {}", deviceContext.getDeviceState().getNodeId());
                         deviceContext.close();
                         return;
@@ -314,9 +317,9 @@ public class StatisticsManagerImpl implements StatisticsManager, StatisticsManag
             controlServiceRegistration.close();
             controlServiceRegistration = null;
         }
-        for (final StatisticsContext statCtx : contexts.values()) {
-            statCtx.close();
+        for (final Iterator<Entry<DeviceContext, StatisticsContext>> iterator = Iterators
+                .consumingIterator(contexts.entrySet().iterator()); iterator.hasNext();) {
+            iterator.next().getValue().close();
         }
-        contexts.clear();
     }
 }
index 7bf986bf20ffed68a8dad7466baabb417b0f217d..6b7d45f7f3a1b20fec90ebaf16ab1167acece5b1 100644 (file)
@@ -103,7 +103,7 @@ public class MdSalRegistratorUtils {
         Preconditions.checkArgument(rpcContext != null);
         Preconditions.checkArgument(newRole != null);
         Verify.verify(OfpRole.BECOMESLAVE.equals(newRole), "Service call with bad Role {} we expect role BECOMESLAVE", newRole);
-        
+
         unregisterServices(rpcContext);
     }
 
index e4154f6d9afe641ce638f95a05743153da7abd9a..8ec6befe162ce80efb255d3213d30781dc66b50d 100644 (file)
@@ -17,14 +17,12 @@ import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
 import java.lang.reflect.Field;
 import java.math.BigInteger;
 import java.util.Collections;
 import java.util.concurrent.ConcurrentHashMap;
-
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -117,13 +115,13 @@ public class DeviceManagerImplTest {
         when(mockFeatures.getDatapathId()).thenReturn(BigInteger.valueOf(21L));
     }
 
-    @Test
-    public void onDeviceContextLevelUpFailTest() {
+    @Test(expected = IllegalStateException.class)
+    public void onDeviceContextLevelUpFailTest() throws Exception {
         onDeviceContextLevelUp(true);
     }
 
     @Test
-    public void onDeviceContextLevelUpSuccessTest() {
+    public void onDeviceContextLevelUpSuccessTest() throws Exception {
         onDeviceContextLevelUp(false);
     }
 
@@ -131,12 +129,12 @@ public class DeviceManagerImplTest {
         return prepareDeviceManager(false);
     }
 
-    private DeviceManagerImpl prepareDeviceManager(boolean withException) {
-        DataBroker mockedDataBroker = mock(DataBroker.class);
-        WriteTransaction mockedWriteTransaction = mock(WriteTransaction.class);
+    private DeviceManagerImpl prepareDeviceManager(final boolean withException) {
+        final DataBroker mockedDataBroker = mock(DataBroker.class);
+        final WriteTransaction mockedWriteTransaction = mock(WriteTransaction.class);
 
-        BindingTransactionChain mockedTxChain = mock(BindingTransactionChain.class);
-        WriteTransaction mockedWTx = mock(WriteTransaction.class);
+        final BindingTransactionChain mockedTxChain = mock(BindingTransactionChain.class);
+        final WriteTransaction mockedWTx = mock(WriteTransaction.class);
         when(mockedTxChain.newWriteOnlyTransaction()).thenReturn(mockedWTx);
         when(mockedDataBroker.createTransactionChain(any(TransactionChainListener.class))).thenReturn
                 (mockedTxChain);
@@ -144,17 +142,17 @@ public class DeviceManagerImplTest {
 
         when(mockedWriteTransaction.submit()).thenReturn(mockedFuture);
 
-        MessageIntelligenceAgency mockedMessageIntelligenceAgency = mock(MessageIntelligenceAgency.class);
-        DeviceManagerImpl deviceManager = new DeviceManagerImpl(mockedDataBroker, mockedMessageIntelligenceAgency,
+        final MessageIntelligenceAgency mockedMessageIntelligenceAgency = mock(MessageIntelligenceAgency.class);
+        final DeviceManagerImpl deviceManager = new DeviceManagerImpl(mockedDataBroker, mockedMessageIntelligenceAgency,
                 TEST_VALUE_GLOBAL_NOTIFICATION_QUOTA, false);
         deviceManager.setDeviceInitializationPhaseHandler(deviceInitPhaseHandler);
 
         return deviceManager;
     }
 
-    public void onDeviceContextLevelUp(boolean withException) {
-        DeviceManagerImpl deviceManager = prepareDeviceManager(withException);
-        DeviceState mockedDeviceState = mock(DeviceState.class);
+    public void onDeviceContextLevelUp(final boolean withException) throws Exception {
+        final DeviceManagerImpl deviceManager = prepareDeviceManager(withException);
+        final DeviceState mockedDeviceState = mock(DeviceState.class);
         when(mockedDeviceContext.getDeviceState()).thenReturn(mockedDeviceState);
         when(mockedDeviceState.getRole()).thenReturn(OfpRole.BECOMEMASTER);
 
@@ -173,13 +171,13 @@ public class DeviceManagerImplTest {
 
     @Test
     public void deviceConnectedTest() throws Exception{
-        DeviceManagerImpl deviceManager = prepareDeviceManager();
+        final DeviceManagerImpl deviceManager = prepareDeviceManager();
         injectMockTranslatorLibrary(deviceManager);
-        ConnectionContext mockConnectionContext = buildMockConnectionContext(OFConstants.OFP_VERSION_1_3);
+        final ConnectionContext mockConnectionContext = buildMockConnectionContext(OFConstants.OFP_VERSION_1_3);
 
         deviceManager.deviceConnected(mockConnectionContext);
 
-        InOrder order = inOrder(mockConnectionContext);
+        final InOrder order = inOrder(mockConnectionContext);
         order.verify(mockConnectionContext).getFeatures();
         order.verify(mockConnectionContext).setOutboundQueueProvider(any(OutboundQueueProvider.class));
         order.verify(mockConnectionContext).setOutboundQueueHandleRegistration(
@@ -190,21 +188,21 @@ public class DeviceManagerImplTest {
 
     @Test
     public void deviceConnectedV10Test() throws Exception{
-        DeviceManagerImpl deviceManager = prepareDeviceManager();
+        final DeviceManagerImpl deviceManager = prepareDeviceManager();
         injectMockTranslatorLibrary(deviceManager);
-        ConnectionContext mockConnectionContext = buildMockConnectionContext(OFConstants.OFP_VERSION_1_0);
+        final ConnectionContext mockConnectionContext = buildMockConnectionContext(OFConstants.OFP_VERSION_1_0);
 
-        PhyPortBuilder phyPort = new PhyPortBuilder()
+        final PhyPortBuilder phyPort = new PhyPortBuilder()
                 .setPortNo(41L);
         when(mockFeatures.getPhyPort()).thenReturn(Collections.singletonList(phyPort.build()));
-        MessageTranslator<Object, Object> mockedTranslator = Mockito.mock(MessageTranslator.class);
+        final MessageTranslator<Object, Object> mockedTranslator = Mockito.mock(MessageTranslator.class);
         when(mockedTranslator.translate(Matchers.<Object>any(), Matchers.<DeviceContext>any(), Matchers.any()))
                 .thenReturn(null);
         when(translatorLibrary.lookupTranslator(Matchers.<TranslatorKey>any())).thenReturn(mockedTranslator);
 
         deviceManager.deviceConnected(mockConnectionContext);
 
-        InOrder order = inOrder(mockConnectionContext);
+        final InOrder order = inOrder(mockConnectionContext);
         order.verify(mockConnectionContext).getFeatures();
         order.verify(mockConnectionContext).setOutboundQueueProvider(any(OutboundQueueProvider.class));
         order.verify(mockConnectionContext).setOutboundQueueHandleRegistration(
@@ -213,12 +211,12 @@ public class DeviceManagerImplTest {
         Mockito.verify(deviceInitPhaseHandler).onDeviceContextLevelUp(Matchers.<DeviceContext>any());
     }
 
-    protected ConnectionContext buildMockConnectionContext(short ofpVersion) {
+    protected ConnectionContext buildMockConnectionContext(final short ofpVersion) {
         when(mockFeatures.getVersion()).thenReturn(ofpVersion);
         when(outboundQueueProvider.reserveEntry()).thenReturn(43L);
         Mockito.doAnswer(new Answer<Void>() {
             @Override
-            public Void answer(InvocationOnMock invocation) throws Throwable {
+            public Void answer(final InvocationOnMock invocation) throws Throwable {
                 final FutureCallback<OfHeader> callBack = (FutureCallback<OfHeader>) invocation.getArguments()[2];
                 callBack.onSuccess(null);
                 return null;
@@ -230,8 +228,8 @@ public class DeviceManagerImplTest {
         when(mockedConnectionAdapter.registerOutboundQueueHandler(Matchers.<OutboundQueueHandler>any(), Matchers.anyInt(), Matchers.anyLong()))
                 .thenAnswer(new Answer<OutboundQueueHandlerRegistration<OutboundQueueHandler>>() {
                     @Override
-                    public OutboundQueueHandlerRegistration<OutboundQueueHandler> answer(InvocationOnMock invocation) throws Throwable {
-                        OutboundQueueHandler handler = (OutboundQueueHandler) invocation.getArguments()[0];
+                    public OutboundQueueHandlerRegistration<OutboundQueueHandler> answer(final InvocationOnMock invocation) throws Throwable {
+                        final OutboundQueueHandler handler = (OutboundQueueHandler) invocation.getArguments()[0];
                         handler.onConnectionQueueChanged(outboundQueueProvider);
                         return null;
                     }
@@ -241,13 +239,13 @@ public class DeviceManagerImplTest {
         return mockConnectionContext;
     }
 
-    private void injectMockTranslatorLibrary(DeviceManagerImpl deviceManager) {
+    private void injectMockTranslatorLibrary(final DeviceManagerImpl deviceManager) {
         deviceManager.setTranslatorLibrary(translatorLibrary);
     }
 
     @Test
     public void testClose() throws Exception {
-        DeviceContext deviceContext = Mockito.mock(DeviceContext.class);
+        final DeviceContext deviceContext = Mockito.mock(DeviceContext.class);
         final DeviceManagerImpl deviceManager = prepareDeviceManager();
         final ConcurrentHashMap<NodeId, DeviceContext> deviceContexts = getContextsCollection(deviceManager);
         deviceContexts.put(mockedNodeId, deviceContext);
@@ -258,7 +256,7 @@ public class DeviceManagerImplTest {
         Mockito.verify(deviceContext).close();
     }
 
-    private static ConcurrentHashMap<NodeId, DeviceContext> getContextsCollection(DeviceManagerImpl deviceManager) throws NoSuchFieldException, IllegalAccessException {
+    private static ConcurrentHashMap<NodeId, DeviceContext> getContextsCollection(final DeviceManagerImpl deviceManager) throws NoSuchFieldException, IllegalAccessException {
         // HACK: contexts collection for testing shall be accessed in some more civilized way
         final Field contextsField = DeviceManagerImpl.class.getDeclaredField("deviceContexts");
         Assert.assertNotNull(contextsField);