Fixed following issues 34/33534/7
authorAnil Vishnoi <vishnoianil@gmail.com>
Tue, 26 Jan 2016 07:20:58 +0000 (23:20 -0800)
committerAnil Vishnoi <vishnoianil@gmail.com>
Wed, 27 Jan 2016 18:30:38 +0000 (10:30 -0800)
1) Statistics collector to stop writing when
device is not connected and rpc's are deregistered
2) Stop send default slave role
3) Disable aggregate statistics (working on better implementation)

Change-Id: I3cb9e347e0cbe3f18222380c784ef4edc724b449
Signed-off-by: Anil Vishnoi <vishnoianil@gmail.com>
17 files changed:
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/StatNodeRegistration.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatAbstractListenCommit.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatAbstractNotifyCommit.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatListenCommitFlow.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatListenCommitGroup.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatListenCommitMeter.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatListenCommitQueue.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatNodeRegistrationImpl.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatNotifyCommitPort.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatNotifyCommitTable.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatPermCollectorImpl.java
applications/statistics-manager/src/main/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatisticsManagerImpl.java
applications/statistics-manager/src/test/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatAbstractListenCommitTest.java
applications/statistics-manager/src/test/java/org/opendaylight/openflowplugin/applications/statistics/manager/impl/StatListenCommitFlowTest.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/role/OfEntityManager.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/RoleUtil.java

index 178befea06e3124014fee016a0e8b108ce0b5d6b..4defa148c678efd4af9c072074cde564e4c0f41b 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.openflowplugin.applications.statistics.manager;
 
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.flow.node.SwitchFeatures;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -47,4 +48,11 @@ public interface StatNodeRegistration extends OpendaylightInventoryListener, Aut
      * @param keyIdent
      */
     void disconnectFlowCapableNode(InstanceIdentifier<Node> keyIdent);
+
+    /**
+     * Method returns if *this* instance of the stats-manager is owner of the node
+     * @param node Given Node
+     * @return true if owner, else false
+     */
+    boolean isFlowCapableNodeOwner(NodeId node);
 }
index 66f5660c7d162a0e90d92cfbed988945ab59f419..0b95b2b9b4d4324ba16930a2af27e497c374206b 100644 (file)
@@ -21,7 +21,9 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatListeningCommiter;
+import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
@@ -55,18 +57,21 @@ public abstract class StatAbstractListenCommit<T extends DataObject, N extends N
 
     private final DataBroker dataBroker;
 
+    protected final StatNodeRegistration nodeRegistrationManager;
+
     private ReadOnlyTransaction currentReadTx;
     private volatile boolean currentReadTxStale;
 
     /* Constructor has to make a registration */
     public StatAbstractListenCommit(final StatisticsManager manager, final DataBroker db,
-            final NotificationProviderService nps, final Class<T> clazz) {
-        super(manager,nps);
+            final NotificationProviderService nps, final Class<T> clazz, final StatNodeRegistration nodeRegistrationManager) {
+        super(manager,nps, nodeRegistrationManager);
         this.clazz = Preconditions.checkNotNull(clazz, "Referenced Class can not be null");
         Preconditions.checkArgument(db != null, "DataBroker can not be null!");
         listenerRegistration = db.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION,
                 getWildCardedRegistrationPath(), this, DataChangeScope.BASE);
         this.dataBroker = db;
+        this.nodeRegistrationManager = nodeRegistrationManager;
     }
 
     /**
@@ -160,5 +165,6 @@ public abstract class StatAbstractListenCommit<T extends DataObject, N extends N
 
         return Optional.absent();
     }
+
 }
 
index ad866feb6d96dc12353bf6555638d2f5f8b9329b..dcb64734cf30ea70b2d9066f7da572cda44df96e 100644 (file)
@@ -13,6 +13,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatNotifyCommiter;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
@@ -46,12 +47,16 @@ public abstract class StatAbstractNotifyCommit<N extends NotificationListener> i
 
     protected final StatisticsManager manager;
     private ListenerRegistration<NotificationListener> notifyListenerRegistration;
+    protected final StatNodeRegistration nodeRegistrationManager;
+
 
     public StatAbstractNotifyCommit(final StatisticsManager manager,
-            final NotificationProviderService nps) {
+            final NotificationProviderService nps,
+                                    final StatNodeRegistration nodeRegistrationManager) {
         Preconditions.checkArgument(nps != null, "NotificationProviderService can not be null!");
         this.manager = Preconditions.checkNotNull(manager, "StatisticManager can not be null!");
         notifyListenerRegistration = nps.registerNotificationListener(getStatNotificationListener());
+        this.nodeRegistrationManager = nodeRegistrationManager;
     }
 
     @Override
index 5fce2b9f761a760abe980106add2ec1fb48ae8ab..63f08c79dd9d173f8c7d8069705b6db4e62ff6af 100644 (file)
@@ -25,6 +25,7 @@ import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation;
@@ -92,8 +93,9 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
     private final AtomicInteger unaccountedFlowsCounter = new AtomicInteger(0);
 
     public StatListenCommitFlow (final StatisticsManager manager, final DataBroker db,
-            final NotificationProviderService nps){
-        super(manager, db, nps, Flow.class);
+            final NotificationProviderService nps,
+                                 final StatNodeRegistration nrm){
+        super(manager, db, nps, Flow.class,nrm);
     }
 
     @Override
@@ -132,6 +134,9 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                 if (( ! inputObj.isPresent()) || ( ! (inputObj.get() instanceof Table))) {
                     return;
                 }
+
+                if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
+
                 final Table table = (Table) inputObj.get();
                 final List<? extends TransactionAware> cacheNotifs = txContainer.get().getNotifications();
                 for (final TransactionAware notif : cacheNotifs) {
@@ -191,6 +196,8 @@ public class StatListenCommitFlow extends StatAbstractListenCommit<Flow, Openday
                 if (( ! txContainer.isPresent()) || txContainer.get().getNotifications() == null) {
                     return;
                 }
+                if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
+
                 final List<FlowAndStatisticsMapList> flowStats = new ArrayList<FlowAndStatisticsMapList>(10);
                 final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier.create(Nodes.class)
                         .child(Node.class, new NodeKey(nodeId));
index f30984ffab4228dd7b9f5e847fab42b96f69cac5..a274e6b16256cc72cc7de1ee9b89c53133b12bd1 100644 (file)
@@ -18,6 +18,7 @@ import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector.StatCapabTypes;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
@@ -76,8 +77,9 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
     private static final Logger LOG = LoggerFactory.getLogger(StatListenCommitMeter.class);
 
     public StatListenCommitGroup(final StatisticsManager manager,  final DataBroker db,
-            final NotificationProviderService nps) {
-        super(manager, db, nps, Group.class);
+            final NotificationProviderService nps,
+                                 final StatNodeRegistration nrm) {
+        super(manager, db, nps, Group.class,nrm);
     }
 
     @Override
@@ -127,6 +129,9 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
                 if ( ! isTransactionCacheContainerValid(txContainer)) {
                     return;
                 }
+
+                if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
+
                 /* Prepare List actual Groups and not updated Groups will be removed */
                 final List<Group> existGroups = fNode.get().getGroup() != null
                         ? fNode.get().getGroup() : Collections.<Group> emptyList();
@@ -173,6 +178,8 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
                     return;
                 }
 
+                if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
+
                 final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier
                         .create(Nodes.class).child(Node.class, new NodeKey(nodeId));
 
@@ -246,6 +253,9 @@ public class StatListenCommitGroup extends StatAbstractListenCommit<Group, Opend
                 if ( ! isTransactionCacheContainerValid(txContainer)) {
                     return;
                 }
+
+                if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
+
                 final List<? extends TransactionAware> cacheNotifs = txContainer.get().getNotifications();
 
                 Optional<Group> notifGroup = Optional.absent();
index e0a8ec3267b800c12115c40b66220e6759aed36c..0674bc9a53ad66528322cf8bce699c312a3f8a7e 100644 (file)
@@ -18,6 +18,7 @@ import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatPermCollector.StatCapabTypes;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
@@ -74,8 +75,9 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
     private static final Logger LOG = LoggerFactory.getLogger(StatListenCommitMeter.class);
 
     public StatListenCommitMeter(final StatisticsManager manager, final DataBroker db,
-            final NotificationProviderService nps) {
-        super(manager, db, nps, Meter.class);
+            final NotificationProviderService nps,
+                                 final StatNodeRegistration nrm) {
+        super(manager, db, nps, Meter.class,nrm);
     }
 
     @Override
@@ -127,6 +129,9 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
                 if ( ! isTransactionCacheContainerValid(txContainer)) {
                     return;
                 }
+
+                if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
+
                 /* Prepare List actual Meters and not updated Meters will be removed */
                 final List<Meter> existMeters = fNode.get().getMeter() != null
                         ? fNode.get().getMeter() : Collections.<Meter> emptyList();
@@ -172,6 +177,8 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
                     return;
                 }
 
+                if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
+
                 final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier
                         .create(Nodes.class).child(Node.class, new NodeKey(nodeId));
 
@@ -245,6 +252,8 @@ public class StatListenCommitMeter extends StatAbstractListenCommit<Meter, Opend
                 if ( ! isTransactionCacheContainerValid(txContainer)) {
                     return;
                 }
+                if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
+
                 final List<? extends TransactionAware> cacheNotifs = txContainer.get().getNotifications();
 
                 Optional<Meter> notifMeter = Optional.absent();
index af8cb213f50fe8308a7b8be4aedb331c451ac060..db980dc1ee1a29d618a7a4e1ada2953cfd5eed66 100644 (file)
@@ -22,6 +22,7 @@ import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation;
@@ -67,8 +68,9 @@ public class StatListenCommitQueue extends StatAbstractListenCommit<Queue, Opend
     private static final Logger LOG = LoggerFactory.getLogger(StatListenCommitQueue.class);
 
     public StatListenCommitQueue(final StatisticsManager manager, final DataBroker db,
-            final NotificationProviderService nps) {
-        super(manager, db, nps, Queue.class);
+            final NotificationProviderService nps,
+                                 final StatNodeRegistration nrm) {
+        super(manager, db, nps, Queue.class,nrm);
     }
 
     @Override
@@ -121,15 +123,20 @@ public class StatListenCommitQueue extends StatAbstractListenCommit<Queue, Opend
                 if ( ! isTransactionCacheContainerValid(txContainer)) {
                     return;
                 }
+
+                if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
+
                 /* Prepare List actual Queues and not updated Queues will be removed */
                 final List<NodeConnector> existConnectors = fNode.get().getNodeConnector() != null
                         ? fNode.get().getNodeConnector() : Collections.<NodeConnector> emptyList();
                 final Map<QueueKey, NodeConnectorKey> existQueueKeys = new HashMap<>();
                 for (final NodeConnector connect : existConnectors) {
-                    final List<Queue> listQueues = connect.getAugmentation(FlowCapableNodeConnector.class).getQueue();
-                    if (listQueues != null) {
-                        for (final Queue queue : listQueues) {
-                            existQueueKeys.put(queue.getKey(), connect.getKey());
+                    if(connect.getAugmentation(FlowCapableNodeConnector.class) != null){
+                        final List<Queue> listQueues = connect.getAugmentation(FlowCapableNodeConnector.class).getQueue();
+                        if (listQueues != null) {
+                            for (final Queue queue : listQueues) {
+                                existQueueKeys.put(queue.getKey(), connect.getKey());
+                            }
                         }
                     }
                 }
index 3e650d9489da349a21fcf71cfd366f1b6ec8479b..e71f3d1ecb56a415b28766336a3b80e8598d8a4a 100644 (file)
@@ -10,9 +10,14 @@ package org.opendaylight.openflowplugin.applications.statistics.manager.impl;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
@@ -42,6 +47,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpd
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,10 +65,16 @@ public class StatNodeRegistrationImpl implements StatNodeRegistration,EntityOwne
 
     private static final Logger LOG = LoggerFactory.getLogger(StatNodeRegistrationImpl.class);
 
+    private static final QName ENTITY_QNAME =
+            org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.core.general.entity.rev150820.Entity.QNAME;
+    private static final QName ENTITY_NAME = QName.create(ENTITY_QNAME, "name");
+
     private final StatisticsManager manager;
     private ListenerRegistration<?> notifListenerRegistration;
     //private DataBroker db;
     private EntityOwnershipListenerRegistration ofListenerRegistration = null;
+    private final Map<NodeId, Boolean> nodeOwnershipState = new ConcurrentHashMap();
+
 
     public StatNodeRegistrationImpl(final StatisticsManager manager, final DataBroker db,
             final NotificationProviderService notificationService) {
@@ -138,10 +152,9 @@ public class StatNodeRegistrationImpl implements StatNodeRegistration,EntityOwne
         manager.disconnectedNodeUnregistration(nodeIdent);
     }
 
-    private boolean preConfigurationCheck(final InstanceIdentifier<Node> nodeIdent) {
-        Preconditions.checkNotNull(nodeIdent, "Node Instance Identifier can not be null!");
-        NodeId nodeId = InstanceIdentifier.keyOf(nodeIdent).getId();
-        final Entity entity = new Entity("openflow", nodeId.getValue());
+    private boolean preConfigurationCheck(final NodeId nodeId) {
+        Preconditions.checkNotNull(nodeId, "Node Instance Identifier can not be null!");
+        final Entity entity = getEntity(nodeId);
         EntityOwnershipService ownershipService = manager.getOwnershipService();
         if(ownershipService == null) {
             LOG.error("preConfigurationCheck: EntityOwnershipService is null");
@@ -179,6 +192,7 @@ public class StatNodeRegistrationImpl implements StatNodeRegistration,EntityOwne
                 nodeRefIdent.firstIdentifierOf(Node.class);
         if (nodeIdent != null) {
             LOG.debug("Received onNodeRemoved for node:{} ", nodeIdent);
+            removeOwnership(InstanceIdentifier.keyOf(nodeIdent).getId());
             disconnectFlowCapableNode(nodeIdent);
         }
     }
@@ -201,7 +215,10 @@ public class StatNodeRegistrationImpl implements StatNodeRegistration,EntityOwne
             connectFlowCapableNode(swichFeaturesIdent, switchFeatures, nodeIdent);
 
             //Send group/meter request to get addition details not present in switch feature response.
-            if(preConfigurationCheck(nodeIdent)) {
+            NodeId nodeId = InstanceIdentifier.keyOf(nodeIdent).getId();
+            boolean ownershipState = preConfigurationCheck(nodeId);
+            setNodeOwnership(nodeId, ownershipState);
+            if(ownershipState) {
                 LOG.info("onNodeUpdated: Send group/meter feature request to the device {}",nodeIdent);
                 manager.getRpcMsgManager().getGroupFeaturesStat(nodeRef);
                 manager.getRpcMsgManager().getMeterFeaturesStat(nodeRef);
@@ -209,14 +226,37 @@ public class StatNodeRegistrationImpl implements StatNodeRegistration,EntityOwne
         }
     }
 
+    @Override
+    public boolean isFlowCapableNodeOwner(NodeId node) {
+        if(this.nodeOwnershipState.containsKey(node)){
+            return this.nodeOwnershipState.get(node).booleanValue();
+        }
+        return false;
+    }
+
+
     @Override
     public void ownershipChanged(EntityOwnershipChange ownershipChange) {
-        //I believe the only scenario we need to handle here is
-        // isOwner=false && hasOwner=false. E.g switch is connected to only
-        // one controller and that goes down, all other controller will get
-        // notification about ownership change with the flag set as above.
-        // In this scenario, topology manager should remove the node from
-        // operational data store, so no explict action is required here.
+
+        YangInstanceIdentifier yId = ownershipChange.getEntity().getId();
+        NodeIdentifierWithPredicates niWPredicates = (NodeIdentifierWithPredicates)yId.getLastPathArgument();
+        Map<QName, Object> keyValMap = niWPredicates.getKeyValues();
+        String nodeIdStr = (String)(keyValMap.get(ENTITY_NAME));
+        BigInteger dpId = new BigInteger(nodeIdStr.split(":")[1]);
+        NodeId nodeId = new NodeId(nodeIdStr);
+        setNodeOwnership(nodeId, ownershipChange.isOwner());
+    }
+
+    private void setNodeOwnership(NodeId node, boolean ownership) {
+        this.nodeOwnershipState.put(node,ownership);
+    }
+
+    private void removeOwnership(NodeId node) {
+        this.nodeOwnershipState.remove(node);
+    }
+
+    private Entity getEntity(NodeId nodeId) {
+        return new Entity("openflow", nodeId.getValue());
     }
 
 }
index 61a8dc03ca149fbbdddbbaa1e3f69068ace25f64..77e00c94227e707f50bad54f8acc90d1419e2086 100644 (file)
@@ -16,6 +16,7 @@ import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation;
@@ -60,8 +61,9 @@ public class StatNotifyCommitPort extends StatAbstractNotifyCommit<OpendaylightP
     private static final Logger LOG = LoggerFactory.getLogger(StatNotifyCommitPort.class);
 
     public StatNotifyCommitPort(final StatisticsManager manager,
-            final NotificationProviderService nps) {
-        super(manager, nps);
+            final NotificationProviderService nps,
+                                final StatNodeRegistration nrm) {
+        super(manager, nps,nrm);
     }
 
     @Override
@@ -91,6 +93,9 @@ public class StatNotifyCommitPort extends StatAbstractNotifyCommit<OpendaylightP
                 if (( ! txContainer.isPresent()) || txContainer.get().getNotifications() == null) {
                     return;
                 }
+
+                if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
+
                 final List<NodeConnectorStatisticsAndPortNumberMap> portStats =
                         new ArrayList<NodeConnectorStatisticsAndPortNumberMap>(10);
                 final List<? extends TransactionAware> cachedNotifs = txContainer.get().getNotifications();
index d5bd27e23066d6ecf0797fb35caf5edb8969433e..0bfeaaa4b0e39119279c17c6e63f8024cbc76eae 100644 (file)
@@ -16,6 +16,7 @@ import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatRpcMsgManager.TransactionCacheContainer;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager.StatDataStoreOperation;
@@ -61,8 +62,9 @@ public class StatNotifyCommitTable extends StatAbstractNotifyCommit<Opendaylight
     private static final Logger LOG = LoggerFactory.getLogger(StatNotifyCommitTable.class);
 
     public StatNotifyCommitTable(final StatisticsManager manager,
-            final NotificationProviderService nps) {
-        super(manager, nps);
+            final NotificationProviderService nps,
+                                 final StatNodeRegistration nrm) {
+        super(manager, nps, nrm);
     }
 
     @Override
@@ -93,6 +95,9 @@ public class StatNotifyCommitTable extends StatAbstractNotifyCommit<Opendaylight
                 if (( ! txContainer.isPresent()) || txContainer.get().getNodeId() == null) {
                     return;
                 }
+
+                if(!nodeRegistrationManager.isFlowCapableNodeOwner(nodeId)) { return; }
+
                 final List<? extends TransactionAware> cachedNotifs = txContainer.get().getNotifications();
                 for (final TransactionAware notif : cachedNotifs) {
                     if (notif instanceof FlowTableStatisticsUpdate) {
index a740c21c4bc9dedfccd324ad47cdfe7ba9f0430a..59c128748241d6b7999d71fcbc42b78272980e69 100644 (file)
@@ -311,11 +311,11 @@ public class StatPermCollectorImpl implements StatPermCollector {
                         LOG.trace("STAT-MANAGER-collecting FLOW-STATS-ALL_FLOWS for NodeRef {}", actualNodeRef);
                         setActualTransactionId(manager.getRpcMsgManager().getAllFlowsStat(actualNodeRef).get());
                         waitingForNotification();
-                        LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
+                        /*LOG.trace("STAT-MANAGER-collecting FLOW-AGGREGATE-STATS for NodeRef {}", actualNodeRef);
                         for (short i = 0; i < maxTables; i++) {
                             final TableId tableId = new TableId(i);
                             manager.getRpcMsgManager().getAggregateFlowStat(actualNodeRef, tableId);
-                        }
+                        }*/
                         break;
                     default:
                         /* Exception for programmers in implementation cycle */
@@ -330,21 +330,7 @@ public class StatPermCollectorImpl implements StatPermCollector {
     }
 
     private boolean isThisInstanceNodeOwner(NodeId nodeId) {
-        final Entity deviceEntity = new Entity("openflow",nodeId.getValue());
-        if(manager.getOwnershipService().isCandidateRegistered(deviceEntity)) {
-            Optional<EntityOwnershipState> deviceOwnershipState = manager.getOwnershipService()
-                    .getOwnershipState(deviceEntity);
-
-            if(deviceOwnershipState.isPresent()) {
-                return deviceOwnershipState.get().isOwner();
-            } else {
-                LOG.error("Node {} is connected to the controller but ownership state is missing.");
-            }
-        } else {
-            LOG.warn("Node {} is connected to the controller but it did not" +
-                    "registered for the device ownership.",nodeId);
-        }
-        return false;
+        return manager.getNodeRegistrator().isFlowCapableNodeOwner(nodeId);
     }
 
     private class StatNodeInfoHolder {
index 39294f8f2a9600312da7bff18d87184359c7e5c5..9730643d9b3975b1ee58340792c798ac2d3c2c89 100644 (file)
@@ -79,7 +79,7 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
    private AtomicInteger numNodesBeingCollected = new AtomicInteger(0);
 
 
-   private final DataBroker dataBroker;
+    private final DataBroker dataBroker;
    private final ExecutorService statRpcMsgManagerExecutor;
    private final ExecutorService statDataStoreOperationServ;
    private EntityOwnershipService ownershipService;
@@ -117,12 +117,12 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
        rpcMsgManager = new StatRpcMsgManagerImpl(this, rpcRegistry, statManagerConfig.getMaxNodesForCollector());
        statCollectors = Collections.emptyList();
        nodeRegistrator = new StatNodeRegistrationImpl(this, dataBroker, notifService);
-       flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService);
-       meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService);
-       groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService);
-       tableNotifCommiter = new StatNotifyCommitTable(this, notifService);
-       portNotifyCommiter = new StatNotifyCommitPort(this, notifService);
-       queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService);
+       flowListeningCommiter = new StatListenCommitFlow(this, dataBroker, notifService, nodeRegistrator);
+       meterListeningCommiter = new StatListenCommitMeter(this, dataBroker, notifService, nodeRegistrator);
+       groupListeningCommiter = new StatListenCommitGroup(this, dataBroker, notifService, nodeRegistrator);
+       tableNotifCommiter = new StatNotifyCommitTable(this, notifService, nodeRegistrator);
+       portNotifyCommiter = new StatNotifyCommitPort(this, notifService, nodeRegistrator);
+       queueNotifyCommiter = new StatListenCommitQueue(this, dataBroker, notifService, nodeRegistrator);
 
        statRpcMsgManagerExecutor.execute(rpcMsgManager);
        statDataStoreOperationServ.execute(this);
@@ -410,5 +410,6 @@ public class StatisticsManagerImpl implements StatisticsManager, Runnable {
     public EntityOwnershipService getOwnershipService() {
         return this.ownershipService;
     }
+
 }
 
index cbddfbf1cff4b83731d32d6148949be9a75fb766..195dbcee75cd9193bfae705cd833041f4d2a0aab 100644 (file)
@@ -30,6 +30,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -54,6 +55,10 @@ public class StatAbstractListenCommitTest {
     @Mock
     private NotificationListener mockNotificationListener;
 
+    @Mock
+    private StatNodeRegistration statsNodeRegistration;
+
+
     @SuppressWarnings("rawtypes")
     private StatAbstractListenCommit statCommit;
 
@@ -63,7 +68,7 @@ public class StatAbstractListenCommitTest {
         MockitoAnnotations.initMocks(this);
 
         statCommit = new StatAbstractListenCommit(mockStatisticsManager, mockDataBroker,
-                mockNotificationProviderService, DataObject.class) {
+                mockNotificationProviderService, DataObject.class, statsNodeRegistration) {
             @Override
             protected InstanceIdentifier getWildCardedRegistrationPath() {
                 return InstanceIdentifier.create(DataObject.class);
index 9d0a5ef5743f5f39fc064a0fc4d0b11024fa153e..c70130cb3c99a01e4e6a116fd439de0530fa3f10 100644 (file)
@@ -26,6 +26,7 @@ import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.openflowplugin.applications.statistics.manager.StatNodeRegistration;
 import org.opendaylight.openflowplugin.applications.statistics.manager.StatisticsManager;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder;
@@ -72,6 +73,9 @@ public class StatListenCommitFlowTest {
     @Mock
     private DataBroker mockDataBroker;
 
+    @Mock
+    private StatNodeRegistration statsNodeRegistration;
+
     private StatListenCommitFlow statCommitFlow;
     private TableKey tableKey = new TableKey((short) 12);
 
@@ -79,7 +83,7 @@ public class StatListenCommitFlowTest {
     public void init() {
         MockitoAnnotations.initMocks(this);
         statCommitFlow = new StatListenCommitFlow(mockStatisticsManager, mockDataBroker,
-                mockNotificationProviderService);
+                mockNotificationProviderService, statsNodeRegistration);
     }
 
     @Test
index d6852e12d7077fbc957db9edfe94398358f2adae..e9ad1e3d1ba04e4fb14cdaac51ce850ded1fe204 100644 (file)
@@ -492,7 +492,6 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         hsPool.purge();
         conductorState = CONDUCTOR_STATE.WORKING;
         QueueKeeperFactory.plugQueue(queueProcessor, queue);
-        OFSessionUtil.setRole(sessionContext);
     }
 
     /**
index ab0dca8e1c1b9ff935b5d81c776721ea0639c884..ec26a54a88b563ddaeaf794852f97eb1ca4f9835 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.openflowplugin.openflow.md.core.role;
 
 import java.math.BigInteger;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
 import com.google.common.base.Optional;
@@ -68,7 +69,7 @@ public class OfEntityManager implements TransactionChainListener{
         entsession = new ConcurrentHashMap<>();
         entRegistrationMap = new ConcurrentHashMap<>();
         ThreadPoolLoggingExecutor delegate = new ThreadPoolLoggingExecutor(
-            1, 5, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), "ofEntity");
+            20, 20, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), "ofEntity");
         pool =  MoreExecutors.listeningDecorator(delegate);
     }
 
@@ -215,6 +216,7 @@ public class OfEntityManager implements TransactionChainListener{
                 LOG.info("onDeviceOwnershipChanged: Set controller as a SLAVE controller because " +
                         "it's not the OWNER of the {}", entity);
                 entsession.get(entity).getOfSwitch().setEntityOwnership(newRole==OfpRole.BECOMEMASTER);
+                setSlaveRole(sessionContext);
                 sendNodeAddedNotification(entsession.get(entity));
                 if(ownershipChange.wasOwner()) {
                     deregisterRoutedRPCForSwitch(entsession.get(entity));
index 8c9b49a6dd84526040995bd9f634afd337cbf5f0..f335f11ebd1ea38859f3bedbbb3fae55d491bd2d 100644 (file)
@@ -123,7 +123,9 @@ public final class RoleUtil {
         Futures.addCallback(JdkFutureAdapters.listenInPoolThread(roleReply), new FutureCallback<RpcResult<RoleRequestOutput>>() {
             @Override
             public void onSuccess(RpcResult<RoleRequestOutput> input) {
-                result.set(input.getResult().getGenerationId());
+                if(input != null && input.getResult() != null) {
+                    result.set(input.getResult().getGenerationId());
+                }
             }
             @Override
             public void onFailure(Throwable t) {