/**
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
package org.opendaylight.openflowplugin.applications.frm;
-import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
/**
- * forwardingrules-manager
- * org.opendaylight.openflowplugin.applications.frm
- *
- * FlowNodeReconciliation
- * It represent Reconciliation functionality for every new device.
- * So we have to read all possible pre-configured Flows, Meters and Groups from
- * Config/DS and add all to new device.
- * New device is represented by new {@link org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode}
- * in Operational/DS. So we have to add listener for Wildcarded path in base data change scope.
- *
- * WildCarded InstanceIdentifier:
- * {@code
- *
- * InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class)
- *
- * }
+ * Implementation provider of this interface will implement reconciliation functionality for a newly connected node.
+ * Implementation is not enforced to do reconciliation in any specific way, but the higher level intention is to
+ * provide best effort reconciliation of all the configuration (flow/meter/group) present in configuration data store
+ * for the given node.
*
* @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
- *
- * Created: Aug 26, 2014
*/
-public interface FlowNodeReconciliation extends ClusteredDataTreeChangeListener<FlowCapableNode>, AutoCloseable {
-
- /**
- * Method contains Node registration to {@link ForwardingRulesManager} functionality
- * as a prevention to use a validation check to the Operational/DS for identify
- * pre-configure transaction and serious device commit in every transaction.
- *
- * Second part of functionality is own reconciliation pre-configure
- * Flows, Meters and Groups.
- *
- * @param connectedNode - {@link org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier} to new Node
- */
- void flowNodeConnected(InstanceIdentifier<FlowCapableNode> connectedNode);
+public interface FlowNodeReconciliation extends AutoCloseable {
/**
- * Method contains functionality for registered Node {@link FlowCapableNode} removing
- * from {@link ForwardingRulesManager}
- *
- * @param disconnectedNode - {@link org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier} to removed Node
+ * Reconcile the switch data store configuration on the switch
+ * @param connectedNode Node that need reconciliation
*/
- void flowNodeDisconnected(InstanceIdentifier<FlowCapableNode> disconnectedNode);
+ void reconcileConfiguration(InstanceIdentifier<FlowCapableNode> connectedNode);
}
/**
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
/**
- * forwardingrules-manager
- * org.opendaylight.openflowplugin.applications.frm
- *
- * ForwardingRulesManager
- * It represents a central point for whole module. Implementation
+ * It represent a central point for whole module. Implementation
* Flow Provider registers the link FlowChangeListener} and it holds all needed
* services for link FlowChangeListener}.
*
* @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
- *
- * Created: Aug 25, 2014
*/
public interface ForwardingRulesManager extends AutoCloseable {
*/
boolean checkNodeInOperationalDataStore(InstanceIdentifier<FlowCapableNode> ident);
- /**
- * Method add new {@link FlowCapableNode} to active Node Holder.
- * ActiveNodeHolder prevent unnecessary Operational/DS read for identify
- * pre-configure and serious Configure/DS transactions.
- *
- * @param ident - the key of the node
- */
- void registrateNewNode(InstanceIdentifier<FlowCapableNode> ident);
-
- /**
- * Method remove disconnected {@link FlowCapableNode} from active Node
- * Holder. And all next flows or groups or meters will stay in Config/DS
- * only.
- *
- * @param ident - the key of the node
- */
- void unregistrateNode(InstanceIdentifier<FlowCapableNode> ident);
-
/**
* Method returns generated transaction ID, which is unique for
* every transaction. ID is composite from prefix ("DOM") and unique number.
*
* @return
*/
- public SalGroupService getSalGroupService();
+ SalGroupService getSalGroupService();
/**
* Meter RPC service
*/
ForwardingRulesCommiter<TableFeatures> getTableFeaturesCommiter();
- /**
- * Content definition method
- * @return FlowNodeReconciliation
- */
- FlowNodeReconciliation getFlowNodeReconciliation();
-
/**
* Returns the config-subsystem/fallback configuration of FRM
* @return ForwardingRulesManagerConfig
/**
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
@Override
public void onDataTreeChanged(Collection<DataTreeModification<T>> changes) {
Preconditions.checkNotNull(changes, "Changes may not be null!");
+ LOG.trace("Received data changes :{}", changes);
for (DataTreeModification<T> change : changes) {
final InstanceIdentifier<T> key = change.getRootPath().getRootIdentifier();
final DataObjectModification<T> mod = change.getRootNode();
final InstanceIdentifier<FlowCapableNode> nodeIdent =
key.firstIdentifierOf(FlowCapableNode.class);
-
if (preConfigurationCheck(nodeIdent)) {
switch (mod.getModificationType()) {
case DELETE:
protected abstract InstanceIdentifier<T> getWildCardPath();
private boolean preConfigurationCheck(final InstanceIdentifier<FlowCapableNode> nodeIdent) {
- Preconditions.checkNotNull(nodeIdent, "FlowCapableNode ident can not be null!");
+ Preconditions.checkNotNull(nodeIdent, "FlowCapableNode identifier can not be null!");
// In single node cluster, node should be in local cache before we get any flow/group/meter
// data change event from data store. So first check should pass.
// In case of 3-node cluster, when shard leader changes, clustering will send blob of data
// should get populated. But to handle a scenario where flow request comes before the blob
// of config/operational data gets processes, it won't find node in local cache and it will
// skip the flow/group/meter operational. This requires an addition check, where it reads
- // node from operational data store and if it's present it calls flowNodeConnected to explictly
+ // node from operational data store and if it's present it calls flowNodeConnected to explicitly
// trigger the event of new node connected.
if(!provider.isNodeOwner(nodeIdent)) { return false; }
if (!provider.isNodeActive(nodeIdent)) {
if (provider.checkNodeInOperationalDataStore(nodeIdent)) {
- provider.getFlowNodeReconciliation().flowNodeConnected(nodeIdent);
return true;
} else {
return false;
/**
- * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
+ * Copyright (c) 2016, 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final NodeId nodeId;
private final ServiceGroupIdentifier identifier;
private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
+ private final FlowNodeReconciliation reconcliationAgent;
+ private final AtomicBoolean deviceMastered = new AtomicBoolean(false);
+ private final AtomicBoolean isDeviceInOperDS = new AtomicBoolean(false);
+ private final InstanceIdentifier<FlowCapableNode> fcnIID;
private ClusterSingletonServiceRegistration clusterSingletonServiceRegistration;
- private boolean deviceMastered;
- public DeviceMastership(final NodeId nodeId, final ClusterSingletonServiceProvider clusterSingletonService) {
+ public DeviceMastership(final NodeId nodeId,
+ final ClusterSingletonServiceProvider clusterSingletonService,
+ final FlowNodeReconciliation reconcliationAgent) {
this.nodeId = nodeId;
this.identifier = ServiceGroupIdentifier.create(nodeId.getValue());
- this.deviceMastered = false;
this.clusterSingletonServiceProvider = clusterSingletonService;
+ this.reconcliationAgent = reconcliationAgent;
+ fcnIID = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId)).augmentation
+ (FlowCapableNode.class);
}
@Override
public void instantiateServiceInstance() {
LOG.info("FRM started for: {}", nodeId.getValue());
- deviceMastered = true;
+ deviceMastered.set(true);
+ if(canReconcile()) {
+ LOG.info("Triggering reconciliation for device {}", nodeId.getValue());
+ reconcliationAgent.reconcileConfiguration(fcnIID);
+ }
}
@Override
public ListenableFuture<Void> closeServiceInstance() {
LOG.info("FRM stopped for: {}", nodeId.getValue());
- deviceMastered = false;
+ deviceMastered.set(false);
return Futures.immediateFuture(null);
}
}
public boolean isDeviceMastered() {
- return deviceMastered;
+ return deviceMastered.get();
+ }
+
+ public void setDeviceOperationalStatus(boolean inOperDS) {
+ isDeviceInOperDS.set(inOperDS);
+ if(canReconcile()) {
+ LOG.info("Triggering reconciliation for device {}", nodeId.getValue());
+ reconcliationAgent.reconcileConfiguration(fcnIID);
+ }
}
public void registerClusterSingletonService() {
LOG.info("Registering FRM as a cluster singleton service listner for service id : {}",getIdentifier());
clusterSingletonServiceRegistration = clusterSingletonServiceProvider.registerClusterSingletonService(this);
}
+
+ private boolean canReconcile() {
+ return (deviceMastered.get() && isDeviceInOperDS.get());
+ }
}
/**
- * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
+ * Copyright (c) 2016, 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
package org.opendaylight.openflowplugin.applications.frm.impl;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
+import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemoved;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemoved;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Manager for clustering service registrations of {@link DeviceMastership}.
*/
-public class DeviceMastershipManager implements OpendaylightInventoryListener, AutoCloseable{
+public class DeviceMastershipManager implements ClusteredDataTreeChangeListener<FlowCapableNode>,
+ OpendaylightInventoryListener, AutoCloseable{
private static final Logger LOG = LoggerFactory.getLogger(DeviceMastershipManager.class);
+ private static final InstanceIdentifier<FlowCapableNode> II_TO_FLOW_CAPABLE_NODE
+ = InstanceIdentifier.builder(Nodes.class)
+ .child(Node.class)
+ .augmentation(FlowCapableNode.class)
+ .build();
+
private final ClusterSingletonServiceProvider clusterSingletonService;
private final ListenerRegistration<?> notifListenerRegistration;
+ private final FlowNodeReconciliation reconcliationAgent;
+ private final DataBroker dataBroker;
private final ConcurrentHashMap<NodeId, DeviceMastership> deviceMasterships = new ConcurrentHashMap();
+ private final Object lockObj = new Object();
+ private ListenerRegistration<DeviceMastershipManager> listenerRegistration;
+ private Set<InstanceIdentifier<FlowCapableNode>> activeNodes = Collections.emptySet();
public DeviceMastershipManager(final ClusterSingletonServiceProvider clusterSingletonService,
- final NotificationProviderService notificationService) {
+ final NotificationProviderService notificationService,
+ final FlowNodeReconciliation reconcliationAgent,
+ final DataBroker dataBroker) {
this.clusterSingletonService = clusterSingletonService;
this.notifListenerRegistration = notificationService.registerNotificationListener(this);
+ this.reconcliationAgent = reconcliationAgent;
+ this.dataBroker = dataBroker;
+ registerNodeListener();
}
- public void onDeviceConnected(final NodeId nodeId) {
- //No-op
+ public boolean isDeviceMastered(final NodeId nodeId) {
+ return deviceMasterships.get(nodeId) != null && deviceMasterships.get(nodeId).isDeviceMastered();
}
- public void onDeviceDisconnected(final NodeId nodeId) {
- //No-op
- }
+ public boolean isNodeActive(final NodeId nodeId) {
+ final InstanceIdentifier<FlowCapableNode> flowNodeIdentifier = InstanceIdentifier.create(Nodes.class)
+ .child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);
+ return activeNodes.contains(flowNodeIdentifier);
- public boolean isDeviceMastered(final NodeId nodeId) {
- return deviceMasterships.get(nodeId) != null && deviceMasterships.get(nodeId).isDeviceMastered();
}
@VisibleForTesting
public void onNodeUpdated(NodeUpdated notification) {
LOG.debug("NodeUpdate notification received : {}", notification);
DeviceMastership membership = deviceMasterships.computeIfAbsent(notification.getId(), device ->
- new DeviceMastership(notification.getId(), clusterSingletonService));
+ new DeviceMastership(notification.getId(), clusterSingletonService, reconcliationAgent));
membership.registerClusterSingletonService();
}
}
@Override
- public void close() throws Exception {
+ public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<FlowCapableNode>> changes) {
+ Preconditions.checkNotNull(changes, "Changes may not be null!");
+
+ for (DataTreeModification<FlowCapableNode> change : changes) {
+ final InstanceIdentifier<FlowCapableNode> key = change.getRootPath().getRootIdentifier();
+ final DataObjectModification<FlowCapableNode> mod = change.getRootNode();
+ final InstanceIdentifier<FlowCapableNode> nodeIdent =
+ key.firstIdentifierOf(FlowCapableNode.class);
+
+ switch (mod.getModificationType()) {
+ case DELETE:
+ if (mod.getDataAfter() == null) {
+ remove(key, mod.getDataBefore(), nodeIdent);
+ }
+ break;
+ case SUBTREE_MODIFIED:
+ //NO-OP since we do not need to reconcile on Node-updated
+ break;
+ case WRITE:
+ if (mod.getDataBefore() == null) {
+ add(key, mod.getDataAfter(), nodeIdent);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled modification type " + mod.getModificationType());
+ }
+ }
+ }
+
+ public void remove(InstanceIdentifier<FlowCapableNode> identifier, FlowCapableNode del,
+ InstanceIdentifier<FlowCapableNode> nodeIdent) {
+ if(compareInstanceIdentifierTail(identifier,II_TO_FLOW_CAPABLE_NODE)){
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Node removed: {}",nodeIdent.firstKeyOf(Node.class).getId().getValue());
+ }
+
+ if ( ! nodeIdent.isWildcarded()) {
+ if (activeNodes.contains(nodeIdent)) {
+ synchronized (lockObj) {
+ if (activeNodes.contains(nodeIdent)) {
+ Set<InstanceIdentifier<FlowCapableNode>> set =
+ Sets.newHashSet(activeNodes);
+ set.remove(nodeIdent);
+ activeNodes = Collections.unmodifiableSet(set);
+ setNodeOperationalStatus(nodeIdent,false);
+ }
+ }
+ }
+ }
+
+ }
+ }
+
+ public void add(InstanceIdentifier<FlowCapableNode> identifier, FlowCapableNode add,
+ InstanceIdentifier<FlowCapableNode> nodeIdent) {
+ if(compareInstanceIdentifierTail(identifier,II_TO_FLOW_CAPABLE_NODE)){
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Node added: {}",nodeIdent.firstKeyOf(Node.class).getId().getValue());
+ }
+
+ if ( ! nodeIdent.isWildcarded()) {
+ if (!activeNodes.contains(nodeIdent)) {
+ synchronized (lockObj) {
+ if (!activeNodes.contains(nodeIdent)) {
+ Set<InstanceIdentifier<FlowCapableNode>> set = Sets.newHashSet(activeNodes);
+ set.add(nodeIdent);
+ activeNodes = Collections.unmodifiableSet(set);
+ setNodeOperationalStatus(nodeIdent,true);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ if (listenerRegistration != null) {
+ try {
+ listenerRegistration.close();
+ } catch (Exception e) {
+ LOG.warn("Error occurred while closing operational Node listener: {}", e.getMessage());
+ LOG.debug("Error occurred while closing operational Node listener", e);
+ }
+ listenerRegistration = null;
+ }
if (notifListenerRegistration != null) {
notifListenerRegistration.close();
}
}
+
+
+ private boolean compareInstanceIdentifierTail(InstanceIdentifier<?> identifier1,
+ InstanceIdentifier<?> identifier2) {
+ return Iterables.getLast(identifier1.getPathArguments()).equals(Iterables.getLast(identifier2.getPathArguments()));
+ }
+
+ private void setNodeOperationalStatus(InstanceIdentifier<FlowCapableNode> nodeIid, boolean status) {
+ NodeId nodeId = nodeIid.firstKeyOf(Node.class).getId();
+ if (nodeId != null ) {
+ if (deviceMasterships.containsKey(nodeId) ) {
+ deviceMasterships.get(nodeId).setDeviceOperationalStatus(status);
+ LOG.debug("Operational status of device {} is set to {}",nodeId, status);
+ }
+ }
+ }
+ private void registerNodeListener(){
+
+ final InstanceIdentifier<FlowCapableNode> flowNodeWildCardIdentifier = InstanceIdentifier.create(Nodes.class)
+ .child(Node.class).augmentation(FlowCapableNode.class);
+
+ final DataTreeIdentifier<FlowCapableNode> treeId =
+ new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, flowNodeWildCardIdentifier);
+
+ try {
+ SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(ForwardingRulesManagerImpl.STARTUP_LOOP_TICK,
+ ForwardingRulesManagerImpl.STARTUP_LOOP_MAX_RETRIES);
+
+ listenerRegistration = looper.loopUntilNoException(() ->
+ dataBroker.registerDataTreeChangeListener(treeId, DeviceMastershipManager.this));
+ } catch (Exception e) {
+ LOG.warn("Data listener registration failed: {}", e.getMessage());
+ LOG.debug("Data listener registration failed ", e);
+ throw new IllegalStateException("Node listener registration failed!", e);
+ }
+ }
}
/**
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.HashMap;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.annotation.Nonnull;
-
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
-import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase;
import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.OutputActionCase;
import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.list.Action;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroup;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroupKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
- * forwardingrules-manager
- * org.opendaylight.openflowplugin.applications.frm
- *
- * FlowNode Reconciliation Listener
- * Reconciliation for a new FlowNode
+ * Default implementation of {@link ForwardingRulesManager}
*
* @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
- *
- * Created: Jun 13, 2014
*/
public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
+ private static final Logger LOG = LoggerFactory.getLogger(FlowNodeReconciliationImpl.class);
- private static final Logger LOG = LoggerFactory.getLogger(FlowNodeReconciliationImpl.class);
-
- /**
- * The number of nanoseconds to wait for a single group to be added.
- */
+ //The number of nanoseconds to wait for a single group to be added.
private static final long ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(3);
- /**
- * The maximum number of nanoseconds to wait for completion of add-group
- * RPCs.
- */
- private static final long MAX_ADD_GROUP_TIMEOUT =
- TimeUnit.SECONDS.toNanos(20);
-
- private final DataBroker dataBroker;
-
- private final ForwardingRulesManager provider;
+ //The maximum number of nanoseconds to wait for completion of add-group RPCs.
+ private static final long MAX_ADD_GROUP_TIMEOUT = TimeUnit.SECONDS.toNanos(20);
private static final String SEPARATOR = ":";
-
- private ListenerRegistration<FlowNodeReconciliationImpl> listenerRegistration;
-
private static final int THREAD_POOL_SIZE = 4;
- ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
- private static final InstanceIdentifier<FlowCapableNode> II_TO_FLOW_CAPABLE_NODE
- = InstanceIdentifier.builder(Nodes.class)
- .child(Node.class)
- .augmentation(FlowCapableNode.class)
- .build();
+ private final DataBroker dataBroker;
+ private final ForwardingRulesManager provider;
+ private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
public FlowNodeReconciliationImpl (final ForwardingRulesManager manager, final DataBroker db) {
this.provider = Preconditions.checkNotNull(manager, "ForwardingRulesManager can not be null!");
dataBroker = Preconditions.checkNotNull(db, "DataBroker can not be null!");
- /* Build Path */
- final InstanceIdentifier<FlowCapableNode> flowNodeWildCardIdentifier = InstanceIdentifier.create(Nodes.class)
- .child(Node.class).augmentation(FlowCapableNode.class);
-
- final DataTreeIdentifier<FlowCapableNode> treeId =
- new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, flowNodeWildCardIdentifier);
-
- try {
- SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(ForwardingRulesManagerImpl.STARTUP_LOOP_TICK,
- ForwardingRulesManagerImpl.STARTUP_LOOP_MAX_RETRIES);
-
- listenerRegistration = looper.loopUntilNoException(new Callable<ListenerRegistration<FlowNodeReconciliationImpl>>() {
- @Override
- public ListenerRegistration<FlowNodeReconciliationImpl> call() throws Exception {
- return dataBroker.registerDataTreeChangeListener(treeId, FlowNodeReconciliationImpl.this);
- }
- });
- } catch (Exception e) {
- LOG.warn("data listener registration failed: {}", e.getMessage());
- LOG.debug("data listener registration failed.. ", e);
- throw new IllegalStateException("FlowNodeReconciliation startup fail! System needs restart.", e);
- }
}
@Override
public void close() {
- if (listenerRegistration != null) {
- try {
- listenerRegistration.close();
- } catch (Exception e) {
- LOG.warn("Error by stop FRM FlowNodeReconilListener: {}", e.getMessage());
- LOG.debug("Error by stop FRM FlowNodeReconilListener..", e);
- }
- listenerRegistration = null;
+ if (executor != null) {
+ executor.shutdownNow();
}
}
@Override
- public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<FlowCapableNode>> changes) {
- Preconditions.checkNotNull(changes, "Changes may not be null!");
-
- for (DataTreeModification<FlowCapableNode> change : changes) {
- final InstanceIdentifier<FlowCapableNode> key = change.getRootPath().getRootIdentifier();
- final DataObjectModification<FlowCapableNode> mod = change.getRootNode();
- final InstanceIdentifier<FlowCapableNode> nodeIdent =
- key.firstIdentifierOf(FlowCapableNode.class);
-
- switch (mod.getModificationType()) {
- case DELETE:
- if (mod.getDataAfter() == null) {
- remove(key, mod.getDataBefore(), nodeIdent);
- }
- break;
- case SUBTREE_MODIFIED:
- //NO-OP since we donot need to reconciliate on Node-updated
- break;
- case WRITE:
- if (mod.getDataBefore() == null) {
- add(key, mod.getDataAfter(), nodeIdent);
- }
- break;
- default:
- throw new IllegalArgumentException("Unhandled modification type " + mod.getModificationType());
- }
- }
- }
-
-
-
- public void remove(InstanceIdentifier<FlowCapableNode> identifier, FlowCapableNode del,
- InstanceIdentifier<FlowCapableNode> nodeIdent) {
- if(compareInstanceIdentifierTail(identifier,II_TO_FLOW_CAPABLE_NODE)){
- if (LOG.isDebugEnabled()) {
- LOG.debug("Node removed: {}",nodeIdent.firstKeyOf(Node.class).getId().getValue());
- }
-
- if ( ! nodeIdent.isWildcarded()) {
- flowNodeDisconnected(nodeIdent);
- }
-
- }
- }
-
- public void add(InstanceIdentifier<FlowCapableNode> identifier, FlowCapableNode add,
- InstanceIdentifier<FlowCapableNode> nodeIdent) {
- if(compareInstanceIdentifierTail(identifier,II_TO_FLOW_CAPABLE_NODE)){
- if (LOG.isDebugEnabled()) {
- LOG.debug("Node added: {}",nodeIdent.firstKeyOf(Node.class).getId().getValue());
- }
-
- if ( ! nodeIdent.isWildcarded()) {
- flowNodeConnected(nodeIdent);
- }
- }
- }
-
- @Override
- public void flowNodeDisconnected(InstanceIdentifier<FlowCapableNode> disconnectedNode) {
- provider.unregistrateNode(disconnectedNode);
- }
-
- @Override
- public void flowNodeConnected(InstanceIdentifier<FlowCapableNode> connectedNode) {
- flowNodeConnected(connectedNode, false);
- }
-
- private void flowNodeConnected(InstanceIdentifier<FlowCapableNode> connectedNode, boolean force) {
- if (force || !provider.isNodeActive(connectedNode)) {
- provider.registrateNewNode(connectedNode);
-
- if(!provider.isNodeOwner(connectedNode)) { return; }
-
+ public void reconcileConfiguration(InstanceIdentifier<FlowCapableNode> connectedNode) {
+ if (provider.isNodeOwner(connectedNode)) {
if (provider.getConfiguration().isStaleMarkingEnabled()) {
LOG.info("Stale-Marking is ENABLED and proceeding with deletion of stale-marked entities on switch {}",
connectedNode.toString());
LOG.error("Stale entity removal failed {}", t);
}
});
-
- }
-
-
- private boolean compareInstanceIdentifierTail(InstanceIdentifier<?> identifier1,
- InstanceIdentifier<?> identifier2) {
- return Iterables.getLast(identifier1.getPathArguments()).equals(Iterables.getLast(identifier2.getPathArguments()));
}
}
/**
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
import com.google.common.util.concurrent.CheckedFuture;
-import java.util.Collections;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
*
*/
public class ForwardingRulesManagerImpl implements ForwardingRulesManager {
-
private static final Logger LOG = LoggerFactory.getLogger(ForwardingRulesManagerImpl.class);
+
static final int STARTUP_LOOP_TICK = 500;
static final int STARTUP_LOOP_MAX_RETRIES = 8;
private final AtomicLong txNum = new AtomicLong();
- private final Object lockObj = new Object();
- private Set<InstanceIdentifier<FlowCapableNode>> activeNodes = Collections.emptySet();
-
private final DataBroker dataService;
private final SalFlowService salFlowService;
private final SalGroupService salGroupService;
private final SalMeterService salMeterService;
private final SalTableService salTableService;
+ private final ForwardingRulesManagerConfig forwardingRulesManagerConfig;
+ private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
+ private final NotificationProviderService notificationService;
private ForwardingRulesCommiter<Flow> flowListener;
private ForwardingRulesCommiter<Group> groupListener;
private ForwardingRulesCommiter<Meter> meterListener;
private ForwardingRulesCommiter<TableFeatures> tableListener;
private FlowNodeReconciliation nodeListener;
-
- private final ForwardingRulesManagerConfig forwardingRulesManagerConfig;
private FlowNodeConnectorInventoryTranslatorImpl flowNodeConnectorInventoryTranslatorImpl;
- private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
- private final NotificationProviderService notificationService;
private DeviceMastershipManager deviceMastershipManager;
public ForwardingRulesManagerImpl(final DataBroker dataBroker,
@Override
public void start() {
+ this.nodeListener = new FlowNodeReconciliationImpl(this, dataService);
this.deviceMastershipManager = new DeviceMastershipManager(clusterSingletonServiceProvider,
- notificationService);
+ notificationService,
+ this.nodeListener,
+ dataService);
+ flowNodeConnectorInventoryTranslatorImpl = new FlowNodeConnectorInventoryTranslatorImpl(this,dataService);
+
this.flowListener = new FlowForwarder(this, dataService);
this.groupListener = new GroupForwarder(this, dataService);
this.meterListener = new MeterForwarder(this, dataService);
this.tableListener = new TableForwarder(this, dataService);
- this.nodeListener = new FlowNodeReconciliationImpl(this, dataService);
- flowNodeConnectorInventoryTranslatorImpl =
- new FlowNodeConnectorInventoryTranslatorImpl(this,dataService);
LOG.info("ForwardingRulesManager has started successfully.");
}
@Override
public boolean isNodeActive(InstanceIdentifier<FlowCapableNode> ident) {
- return activeNodes.contains(ident);
+ return deviceMastershipManager.isNodeActive(ident.firstKeyOf(Node.class).getId());
}
@Override
return result;
}
- @Override
- public void registrateNewNode(InstanceIdentifier<FlowCapableNode> ident) {
- if (!activeNodes.contains(ident)) {
- synchronized (lockObj) {
- if (!activeNodes.contains(ident)) {
- Set<InstanceIdentifier<FlowCapableNode>> set =
- Sets.newHashSet(activeNodes);
- set.add(ident);
- activeNodes = Collections.unmodifiableSet(set);
- deviceMastershipManager.onDeviceConnected(ident.firstKeyOf(Node.class).getId());
- }
- }
- }
- }
-
- @Override
- public void unregistrateNode(InstanceIdentifier<FlowCapableNode> ident) {
- if (activeNodes.contains(ident)) {
- synchronized (lockObj) {
- if (activeNodes.contains(ident)) {
- Set<InstanceIdentifier<FlowCapableNode>> set =
- Sets.newHashSet(activeNodes);
- set.remove(ident);
- activeNodes = Collections.unmodifiableSet(set);
- deviceMastershipManager.onDeviceDisconnected(ident.firstKeyOf(Node.class).getId());
- }
- }
- }
- }
-
@Override
public SalFlowService getSalFlowService() {
return salFlowService;
return tableListener;
}
- @Override
- public FlowNodeReconciliation getFlowNodeReconciliation() {
- return nodeListener;
- }
-
@Override
public ForwardingRulesManagerConfig getConfiguration() {
return forwardingRulesManagerConfig;
public void setDeviceMastershipManager(final DeviceMastershipManager deviceMastershipManager) {
this.deviceMastershipManager = deviceMastershipManager;
}
-
}
/**
- * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
+ * Copyright (c) 2016, 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemovedBuilder;
private ClusterSingletonServiceProvider clusterSingletonService;
@Mock
private NotificationProviderService notificationService;
+ @Mock
+ private FlowNodeReconciliation reconciliationAgent;
+ @Mock
+ private DataBroker dataBroker;
@Before
public void setUp() throws Exception {
deviceMastershipManager = new DeviceMastershipManager(clusterSingletonService,
- notificationService);
+ notificationService, reconciliationAgent, dataBroker);
Mockito.when(clusterSingletonService.registerClusterSingletonService(Matchers.<ClusterSingletonService>any()))
.thenReturn(registration);
}
DeviceMastership serviceInstance = deviceMastershipManager.getDeviceMasterships().get(NODE_ID);
Assert.assertNotNull(serviceInstance);
// destroy context - unregister
- deviceMastershipManager.onDeviceDisconnected(NODE_ID);
Assert.assertNotNull(deviceMastershipManager.getDeviceMasterships().get(NODE_ID));
NodeRemovedBuilder nodeRemovedBuilder = new NodeRemovedBuilder();
InstanceIdentifier<Node> nodeIId = InstanceIdentifier.create(Nodes.class).
/**
- * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
+ * Copyright (c) 2016, 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
/**
@Mock
private DeviceMastershipManager deviceMastershipManager;
+ @Mock
+ private FlowNodeReconciliation reconcliationAgent;
+
@Before
public void setUp() throws Exception {
- deviceMastership = new DeviceMastership(NODE_ID, Mockito.mock(ClusterSingletonServiceProvider.class));
+ deviceMastership = new DeviceMastership(NODE_ID, Mockito.mock(ClusterSingletonServiceProvider.class), reconcliationAgent);
}
@Test