import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Date;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.eclipse.osgi.framework.console.CommandInterpreter;
import org.opendaylight.controller.clustering.services.IClusterContainerServices;
import org.opendaylight.controller.clustering.services.IClusterServices;
import org.opendaylight.controller.configuration.IConfigurationContainerAware;
+import org.opendaylight.controller.connectionmanager.IConnectionManager;
import org.opendaylight.controller.forwardingrulesmanager.FlowConfig;
import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
import org.opendaylight.controller.forwardingrulesmanager.FlowEntryInstall;
import org.opendaylight.controller.forwardingrulesmanager.PortGroupChangeListener;
import org.opendaylight.controller.forwardingrulesmanager.PortGroupConfig;
import org.opendaylight.controller.forwardingrulesmanager.PortGroupProvider;
+import org.opendaylight.controller.forwardingrulesmanager.implementation.data.FlowEntryDistributionOrder;
import org.opendaylight.controller.sal.action.Action;
import org.opendaylight.controller.sal.action.ActionType;
import org.opendaylight.controller.sal.action.Controller;
* the network. It also maintains the central repository of all the forwarding
* rules installed on the network nodes.
*/
-public class ForwardingRulesManager implements IForwardingRulesManager, PortGroupChangeListener,
- IContainerListener, ISwitchManagerAware, IConfigurationContainerAware, IInventoryListener, IObjectReader,
- ICacheUpdateAware<Long, String>, CommandProvider, IFlowProgrammerListener {
- private static final String SAVE = "Save";
+public class ForwardingRulesManager implements
+ IForwardingRulesManager,
+ PortGroupChangeListener,
+ IContainerListener,
+ ISwitchManagerAware,
+ IConfigurationContainerAware,
+ IInventoryListener,
+ IObjectReader,
+ ICacheUpdateAware,
+ CommandProvider,
+ IFlowProgrammerListener {
private static final String NODEDOWN = "Node is Down";
private static final String SUCCESS = StatusCode.SUCCESS.toString();
private static final Logger log = LoggerFactory.getLogger(ForwardingRulesManager.class);
- private Map<Long, String> flowsSaveEvent;
+ private static final String PORTREMOVED = "Port removed";
+ private static final Logger logsync = LoggerFactory.getLogger("FRMsync");
private String frmFileName;
private String portGroupFileName;
private ConcurrentMap<Integer, FlowConfig> staticFlows;
private ConcurrentMap<FlowEntry, FlowEntry> inactiveFlows;
private IContainer container;
- private Set<IForwardingRulesManagerAware> frmAware;
+ private Set<IForwardingRulesManagerAware> frmAware =
+ Collections.synchronizedSet(new HashSet<IForwardingRulesManagerAware>());
private PortGroupProvider portGroupProvider;
private IFlowProgrammerService programmer;
private IClusterContainerServices clusterContainerService = null;
private Thread frmEventHandler;
protected BlockingQueue<FRMEvent> pendingEvents;
+ // Distributes FRM programming in the cluster
+ private IConnectionManager connectionManager;
+
+ /*
+ * Name clustered caches used to support FRM entry distribution these are by
+ * necessity non-transactional as long as need to be able to synchronize
+ * states also while a transaction is in progress
+ */
+ static final String WORKORDERCACHE = "frm.workOrder";
+ static final String WORKSTATUSCACHE = "frm.workStatus";
+
+ /*
+ * Data structure responsible for distributing the FlowEntryInstall requests
+ * in the cluster. The key value is entry that is being either Installed or
+ * Updated or Delete. The value field is the same of the key value in case
+ * of Installation or Deletion, it's the new entry in case of Modification,
+ * this because the clustering caches don't allow null values.
+ *
+ * The logic behind this data structure is that the controller that initiate
+ * the request will place the order here, someone will pick it and then will
+ * remove from this data structure because is being served.
+ *
+ * TODO: We need to have a way to cleanup this data structure if entries are
+ * not picked by anyone, which is always a case can happen especially on
+ * Node disconnect cases.
+ */
+ private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
+
+ /*
+ * Data structure responsible for retrieving the results of the workOrder
+ * submitted to the cluster.
+ *
+ * The logic behind this data structure is that the controller that has
+ * executed the order will then place the result in workStatus signaling
+ * that there was a success or a failure.
+ *
+ * TODO: The workStatus entries need to have a lifetime associated in case
+ * of requestor controller leaving the cluster.
+ */
+ private ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
+
+ /*
+ * Local Map used to hold the Future which a caller can use to monitor for
+ * completion
+ */
+ private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask> workMonitor =
+ new ConcurrentHashMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask>();
+
+ /*
+ * Create an executor pool to create the distributionOrder, this is a stop
+ * gap solution caused by an issue with non-transactional caches in the
+ * implementation we use, being currently worked on. It has been noticed in
+ * fact that when non-transactional caches are being used sometime the key
+ * are no distributed to all the nodes properly. To workaround the issue
+ * transactional caches are being used, but there was a reason for using
+ * non-transactional caches to start with, in fact we needed to be able in
+ * the context of a northbound transaction to program the FRM entries
+ * irrespective of the fact that transaction would commit or no else we
+ * would not be able to achieve the entry programming and implement the
+ * scheme for recovery from network element failures. Bottom line, now in
+ * order to make sure an update on a transactional cache goes out while in a
+ * transaction that need to be initiated by a different thread.
+ */
+ private ExecutorService executor;
+
+ class DistributeOrderCallable implements Callable<Future<Status>> {
+ private FlowEntryInstall e;
+ private FlowEntryInstall u;
+ private UpdateType t;
+ DistributeOrderCallable(FlowEntryInstall e, FlowEntryInstall u, UpdateType t) {
+ this.e = e;
+ this.u = u;
+ this.t = t;
+ }
+
+ @Override
+ public Future<Status> call() throws Exception {
+ if (e == null || t == null) {
+ logsync.error("Unexpected null Entry up update type");
+ return null;
+ }
+ // Create the work order and distribute it
+ FlowEntryDistributionOrder fe =
+ new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
+ // First create the monitor job
+ FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
+ logsync.trace("Node {} not local so sending fe {}", e.getNode(), fe);
+ workMonitor.put(fe, ret);
+ if (t.equals(UpdateType.CHANGED)) {
+ // Then distribute the work
+ workOrder.put(fe, u);
+ } else {
+ // Then distribute the work
+ workOrder.put(fe, e);
+ }
+ logsync.trace("WorkOrder requested");
+ // Now create an Handle to monitor the execution of the operation
+ return ret;
+ }
+ }
+
+ /**
+ * @param e
+ * Entry being installed/updated/removed
+ * @param u
+ * New entry will be placed after the update operation. Valid
+ * only for UpdateType.CHANGED, null for all the other cases
+ * @param t
+ * Type of update
+ * @return a Future object for monitoring the progress of the result, or
+ * null in case the processing should take place locally
+ */
+ private Future<Status> distributeWorkOrder(FlowEntryInstall e, FlowEntryInstall u, UpdateType t) {
+ // A null entry it's an unexpected condition, anyway it's safe to keep
+ // the handling local
+ if (e == null) {
+ return null;
+ }
+
+ Node n = e.getNode();
+ if (!connectionManager.isLocal(n)) {
+ Callable<Future<Status>> worker = new DistributeOrderCallable(e, u, t);
+ if (worker != null) {
+ Future<Future<Status>> workerRes = this.executor.submit(worker);
+ try {
+ return workerRes.get();
+ } catch (InterruptedException e1) {
+ // we where interrupted, not a big deal.
+ return null;
+ } catch (ExecutionException e1) {
+ logsync.error(
+ "We got an execution exception {} we cannot much, so returning we don't have nothing to wait for",
+ e);
+ return null;
+ }
+ }
+ }
+
+ logsync.trace("LOCAL Node {} so processing Entry:{} UpdateType:{}", n, e, t);
+ return null;
+ }
+
/**
* Adds a flow entry onto the network node It runs various validity checks
* and derive the final container flows merged entries that will be
* contain the unique id assigned to this request
*/
private Status modifyEntryInternal(FlowEntryInstall currentEntries, FlowEntryInstall newEntries, boolean async) {
- // Modify the flow on the network node
- Status status = (async) ? programmer.modifyFlowAsync(currentEntries.getNode(), currentEntries.getInstall()
- .getFlow(), newEntries.getInstall().getFlow()) : programmer.modifyFlow(currentEntries.getNode(),
- currentEntries.getInstall().getFlow(), newEntries.getInstall().getFlow());
+ Future<Status> futureStatus = distributeWorkOrder(currentEntries, newEntries, UpdateType.CHANGED);
+ if (futureStatus != null) {
+ Status retStatus = new Status(StatusCode.UNDEFINED);
+ try {
+ retStatus = futureStatus.get();
+ } catch (InterruptedException e) {
+ log.error("", e);
+ } catch (ExecutionException e) {
+ log.error("", e);
+ }
+ return retStatus;
+ } else {
+ // Modify the flow on the network node
+ Status status = async ? programmer.modifyFlowAsync(currentEntries.getNode(), currentEntries.getInstall()
+ .getFlow(), newEntries.getInstall()
+ .getFlow()) : programmer.modifyFlow(currentEntries.getNode(), currentEntries.getInstall()
+ .getFlow(), newEntries.getInstall()
+ .getFlow());
- if (!status.isSuccess()) {
- log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", newEntries.getInstall(),
- status.getDescription());
- return status;
- }
+ if (!status.isSuccess()) {
+ log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", newEntries.getInstall(),
+ status.getDescription());
+ return status;
+ }
- log.trace("Modified {} => {}", currentEntries.getInstall(), newEntries.getInstall());
+ log.trace("Modified {} => {}", currentEntries.getInstall(), newEntries.getInstall());
- // Update DB
- newEntries.setRequestId(status.getRequestId());
- updateLocalDatabase(currentEntries, false);
- updateLocalDatabase(newEntries, true);
+ // Update DB
+ newEntries.setRequestId(status.getRequestId());
+ updateLocalDatabase(currentEntries, false);
+ updateLocalDatabase(newEntries, true);
- return status;
+ return status;
+ }
}
/**
* contain the unique id assigned to this request
*/
private Status removeEntryInternal(FlowEntryInstall entry, boolean async) {
- // Mark the entry to be deleted (for CC just in case we fail)
- entry.toBeDeleted();
+ Future<Status> futureStatus = distributeWorkOrder(entry, null, UpdateType.REMOVED);
+ if (futureStatus != null) {
+ Status retStatus = new Status(StatusCode.UNDEFINED);
+ try {
+ retStatus = futureStatus.get();
+ } catch (InterruptedException e) {
+ log.error("", e);
+ } catch (ExecutionException e) {
+ log.error("", e);
+ }
+ return retStatus;
+ } else {
+ // Mark the entry to be deleted (for CC just in case we fail)
+ entry.toBeDeleted();
- // Remove from node
- Status status = (async) ? programmer.removeFlowAsync(entry.getNode(), entry.getInstall().getFlow())
- : programmer.removeFlow(entry.getNode(), entry.getInstall().getFlow());
+ // Remove from node
+ Status status = async ? programmer.removeFlowAsync(entry.getNode(), entry.getInstall()
+ .getFlow()) : programmer.removeFlow(entry.getNode(), entry.getInstall()
+ .getFlow());
- if (!status.isSuccess()) {
- log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(),
- status.getDescription());
- return status;
- }
- log.trace("Removed {}", entry.getInstall());
+ if (!status.isSuccess()) {
+ log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(),
+ status.getDescription());
+ return status;
+ }
+ log.trace("Removed {}", entry.getInstall());
- // Update DB
- updateLocalDatabase(entry, false);
+ // Update DB
+ updateLocalDatabase(entry, false);
- return status;
+ return status;
+ }
}
/**
* contain the unique id assigned to this request
*/
private Status addEntriesInternal(FlowEntryInstall entry, boolean async) {
- // Install the flow on the network node
- Status status = (async) ? programmer.addFlowAsync(entry.getNode(), entry.getInstall().getFlow()) : programmer
- .addFlow(entry.getNode(), entry.getInstall().getFlow());
+ Future<Status> futureStatus = distributeWorkOrder(entry, null, UpdateType.ADDED);
+ if (futureStatus != null) {
+ Status retStatus = new Status(StatusCode.UNDEFINED);
+ try {
+ retStatus = futureStatus.get();
+ } catch (InterruptedException e) {
+ log.error("", e);
+ } catch (ExecutionException e) {
+ log.error("", e);
+ }
+ return retStatus;
+ } else {
+ // Install the flow on the network node
+ Status status = async ? programmer.addFlowAsync(entry.getNode(), entry.getInstall()
+ .getFlow()) : programmer.addFlow(entry.getNode(), entry.getInstall()
+ .getFlow());
- if (!status.isSuccess()) {
- log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(),
- status.getDescription());
- return status;
- }
+ if (!status.isSuccess()) {
+ log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(),
+ status.getDescription());
+ return status;
+ }
- log.trace("Added {}", entry.getInstall());
+ log.trace("Added {}", entry.getInstall());
- // Update DB
- entry.setRequestId(status.getRequestId());
- updateLocalDatabase(entry, true);
+ // Update DB
+ entry.setRequestId(status.getRequestId());
+ updateLocalDatabase(entry, true);
- return status;
+ return status;
+ }
}
/**
portGroupConfigs = new ConcurrentHashMap<String, PortGroupConfig>();
portGroupData = new ConcurrentHashMap<PortGroupConfig, Map<Node, PortGroup>>();
staticFlows = new ConcurrentHashMap<Integer, FlowConfig>();
- flowsSaveEvent = new HashMap<Long, String>();
inactiveFlows = new ConcurrentHashMap<FlowEntry, FlowEntry>();
}
return list;
}
+ @Override
+ public List<FlowEntry> getInstalledFlowEntriesForGroup(String policyName) {
+ List<FlowEntry> list = new ArrayList<FlowEntry>();
+ if (policyName != null && !policyName.trim().isEmpty()) {
+ for (Map.Entry<FlowEntryInstall, FlowEntryInstall> entry : this.installedSwView.entrySet()) {
+ if (policyName.equals(entry.getKey().getGroupName())) {
+ list.add(entry.getKey().getInstall().clone());
+ }
+ }
+ }
+ return list;
+ }
+
@Override
public void addOutputPort(Node node, String flowName, List<NodeConnector> portList) {
try {
clusterContainerService.createCache("frm.originalSwView",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.installedSwView",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.inactiveFlows",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.nodeFlows",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.groupFlows",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.staticFlows",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.flowsSaveEvent",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.staticFlowsOrdinal",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.portGroupConfigs",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.portGroupData",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache("frm.TSPolicies",
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+
+ clusterContainerService.createCache(WORKSTATUSCACHE,
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+
+ clusterContainerService.createCache(WORKORDERCACHE,
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
} catch (CacheConfigException cce) {
log.error("CacheConfigException");
log.error("Retrieval of frm.staticFlows cache failed for Container {}", container.getName());
}
- map = clusterContainerService.getCache("frm.flowsSaveEvent");
- if (map != null) {
- flowsSaveEvent = (ConcurrentMap<Long, String>) map;
- } else {
- log.error("Retrieval of frm.flowsSaveEvent cache failed for Container {}", container.getName());
- }
-
map = clusterContainerService.getCache("frm.staticFlowsOrdinal");
if (map != null) {
staticFlowsOrdinal = (ConcurrentMap<Integer, Integer>) map;
log.error("Retrieval of frm.TSPolicies cache failed for Container {}", container.getName());
}
+ map = clusterContainerService.getCache(WORKORDERCACHE);
+ if (map != null) {
+ workOrder = (ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall>) map;
+ } else {
+ log.error("Retrieval of " + WORKORDERCACHE + " cache failed for Container {}", container.getName());
+ }
+
+ map = clusterContainerService.getCache(WORKSTATUSCACHE);
+ if (map != null) {
+ workStatus = (ConcurrentMap<FlowEntryDistributionOrder, Status>) map;
+ } else {
+ log.error("Retrieval of " + WORKSTATUSCACHE + " cache failed for Container {}", container.getName());
+ }
}
private boolean flowConfigExists(FlowConfig config) {
/**
* Uninstall all the non-internal Flow Entries present in the software view.
- * A copy of each entry is stored in the inactive list so that it can be
- * re-applied when needed. This function is called on the global instance of
- * FRM only, when the first container is created
+ * If requested, a copy of each original flow entry will be stored in the
+ * inactive list so that it can be re-applied when needed (This is typically
+ * the case when running in the default container and controller moved to
+ * container mode)
+ *
+ * @param preserveFlowEntries
+ * if true, a copy of each original entry is stored in the
+ * inactive list
*/
- private void uninstallAllFlowEntries() {
+ private void uninstallAllFlowEntries(boolean preserveFlowEntries) {
log.info("Uninstalling all non-internal flows");
+ List<FlowEntryInstall> toRemove = new ArrayList<FlowEntryInstall>();
+
// Store entries / create target list
for (ConcurrentMap.Entry<FlowEntryInstall, FlowEntryInstall> mapEntry : installedSwView.entrySet()) {
FlowEntryInstall flowEntries = mapEntry.getValue();
// Skip internal generated static flows
if (!flowEntries.isInternal()) {
- inactiveFlows.put(flowEntries.getOriginal(), flowEntries.getOriginal());
+ toRemove.add(flowEntries);
+ // Store the original entries if requested
+ if (preserveFlowEntries) {
+ inactiveFlows.put(flowEntries.getOriginal(), flowEntries.getOriginal());
+ }
}
}
// Now remove the entries
- for (FlowEntry flowEntry : inactiveFlows.keySet()) {
- Status status = this.removeEntry(flowEntry, false);
+ for (FlowEntryInstall flowEntryHw : toRemove) {
+ Status status = this.removeEntryInternal(flowEntryHw, false);
if (!status.isSuccess()) {
- log.warn("Failed to remove entry: {}. The failure is: {}", flowEntry, status.getDescription());
+ log.warn("Failed to remove entry: {}. The failure is: {}", flowEntryHw, status.getDescription());
}
}
}
@Override
public Status saveConfig() {
- // Publish the save config event to the cluster nodes
- flowsSaveEvent.put(new Date().getTime(), SAVE);
return saveConfigInternal();
}
return new Status(StatusCode.SUCCESS, null);
}
- @Override
- public void entryCreated(Long key, String cacheName, boolean local) {
- }
-
- @Override
- public void entryUpdated(Long key, String new_value, String cacheName, boolean originLocal) {
- saveConfigInternal();
- }
-
- @Override
- public void entryDeleted(Long key, String cacheName, boolean originLocal) {
- }
-
@Override
public void subnetNotify(Subnet sub, boolean add) {
}
addStaticFlowInternal(allowARP, true); // skip validation on internal static flow name
}
+ /**
+ * (non-Javadoc)
+ *
+ * @see org.opendaylight.controller.switchmanager.ISwitchManagerAware#modeChangeNotify(org.opendaylight.controller.sal.core.Node,
+ * boolean)
+ *
+ * This method can be called from within the OSGi framework context,
+ * given the programming operation can take sometime, it not good
+ * pratice to have in it's context operations that can take time,
+ * hence moving off to a different thread for async processing.
+ */
@Override
- public void modeChangeNotify(Node node, boolean proactive) {
- List<FlowConfig> defaultConfigs = new ArrayList<FlowConfig>();
-
- List<String> puntAction = new ArrayList<String>();
- puntAction.add(ActionType.CONTROLLER.toString());
-
- FlowConfig allowARP = new FlowConfig();
- allowARP.setInstallInHw(true);
- allowARP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt ARP" + FlowConfig.INTERNALSTATICFLOWEND);
- allowARP.setPriority("1");
- allowARP.setNode(node);
- allowARP.setEtherType("0x" + Integer.toHexString(EtherTypes.ARP.intValue()).toUpperCase());
- allowARP.setActions(puntAction);
- defaultConfigs.add(allowARP);
-
- FlowConfig allowLLDP = new FlowConfig();
- allowLLDP.setInstallInHw(true);
- allowLLDP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt LLDP" + FlowConfig.INTERNALSTATICFLOWEND);
- allowLLDP.setPriority("1");
- allowLLDP.setNode(node);
- allowLLDP.setEtherType("0x" + Integer.toHexString(EtherTypes.LLDP.intValue()).toUpperCase());
- allowLLDP.setActions(puntAction);
- defaultConfigs.add(allowLLDP);
-
- List<String> dropAction = new ArrayList<String>();
- dropAction.add(ActionType.DROP.toString());
-
- FlowConfig dropAllConfig = new FlowConfig();
- dropAllConfig.setInstallInHw(true);
- dropAllConfig.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Catch-All Drop" + FlowConfig.INTERNALSTATICFLOWEND);
- dropAllConfig.setPriority("0");
- dropAllConfig.setNode(node);
- dropAllConfig.setActions(dropAction);
- defaultConfigs.add(dropAllConfig);
-
- log.info("Forwarding mode for node {} set to {}", node, (proactive ? "proactive" : "reactive"));
- for (FlowConfig fc : defaultConfigs) {
- Status status = (proactive) ? addStaticFlowInternal(fc, true) : removeStaticFlow(fc);
- if (status.isSuccess()) {
- log.info("{} Proactive Static flow: {}", (proactive ? "Installed" : "Removed"), fc.getName());
- } else {
- log.warn("Failed to {} Proactive Static flow: {}", (proactive ? "install" : "remove"), fc.getName());
+ public void modeChangeNotify(final Node node, final boolean proactive) {
+ Callable<Status> modeChangeCallable = new Callable<Status>() {
+ @Override
+ public Status call() throws Exception {
+ List<FlowConfig> defaultConfigs = new ArrayList<FlowConfig>();
+
+ List<String> puntAction = new ArrayList<String>();
+ puntAction.add(ActionType.CONTROLLER.toString());
+
+ FlowConfig allowARP = new FlowConfig();
+ allowARP.setInstallInHw(true);
+ allowARP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt ARP" + FlowConfig.INTERNALSTATICFLOWEND);
+ allowARP.setPriority("1");
+ allowARP.setNode(node);
+ allowARP.setEtherType("0x" + Integer.toHexString(EtherTypes.ARP.intValue())
+ .toUpperCase());
+ allowARP.setActions(puntAction);
+ defaultConfigs.add(allowARP);
+
+ FlowConfig allowLLDP = new FlowConfig();
+ allowLLDP.setInstallInHw(true);
+ allowLLDP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt LLDP" + FlowConfig.INTERNALSTATICFLOWEND);
+ allowLLDP.setPriority("1");
+ allowLLDP.setNode(node);
+ allowLLDP.setEtherType("0x" + Integer.toHexString(EtherTypes.LLDP.intValue())
+ .toUpperCase());
+ allowLLDP.setActions(puntAction);
+ defaultConfigs.add(allowLLDP);
+
+ List<String> dropAction = new ArrayList<String>();
+ dropAction.add(ActionType.DROP.toString());
+
+ FlowConfig dropAllConfig = new FlowConfig();
+ dropAllConfig.setInstallInHw(true);
+ dropAllConfig.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Catch-All Drop"
+ + FlowConfig.INTERNALSTATICFLOWEND);
+ dropAllConfig.setPriority("0");
+ dropAllConfig.setNode(node);
+ dropAllConfig.setActions(dropAction);
+ defaultConfigs.add(dropAllConfig);
+
+ log.info("Forwarding mode for node {} set to {}", node, (proactive ? "proactive" : "reactive"));
+ for (FlowConfig fc : defaultConfigs) {
+ Status status = (proactive) ? addStaticFlowInternal(fc, false) : removeStaticFlow(fc);
+ if (status.isSuccess()) {
+ log.info("{} Proactive Static flow: {}", (proactive ? "Installed" : "Removed"), fc.getName());
+ } else {
+ log.warn("Failed to {} Proactive Static flow: {}", (proactive ? "install" : "remove"),
+ fc.getName());
+ }
+ }
+ return new Status(StatusCode.SUCCESS);
}
- }
+ };
+
+ /*
+ * Execute the work outside the caller context, this could be an
+ * expensive operation and we don't want to block the caller for it.
+ */
+ this.executor.submit(modeChangeCallable);
}
/**
}
}
+ private boolean doesFlowContainNodeConnector(Flow flow, NodeConnector nc) {
+ if (nc == null) {
+ return false;
+ }
+
+ Match match = flow.getMatch();
+ if (match.isPresent(MatchType.IN_PORT)) {
+ NodeConnector matchPort = (NodeConnector) match.getField(MatchType.IN_PORT).getValue();
+ if (matchPort.equals(nc)) {
+ return true;
+ }
+ }
+ List<Action> actionsList = flow.getActions();
+ if (actionsList != null) {
+ for (Action action : actionsList) {
+ if (action instanceof Output) {
+ NodeConnector actionPort = ((Output) action).getPort();
+ if (actionPort.equals(nc)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
@Override
public void notifyNode(Node node, UpdateType type, Map<String, Property> propMap) {
this.pendingEvents.offer(new NodeUpdateEvent(type, node));
*
*/
void init() {
- frmAware = Collections.synchronizedSet(new HashSet<IForwardingRulesManagerAware>());
frmFileName = GlobalConstants.STARTUPHOME.toString() + "frm_staticflows_" + this.getContainerName() + ".conf";
portGroupFileName = GlobalConstants.STARTUPHOME.toString() + "portgroup_" + this.getContainerName() + ".conf";
} else if (event instanceof ErrorReportedEvent) {
ErrorReportedEvent errEvent = (ErrorReportedEvent) event;
processErrorEvent(errEvent);
+ } else if (event instanceof WorkOrderEvent) {
+ /*
+ * Take care of handling the remote Work request
+ */
+ WorkOrderEvent work = (WorkOrderEvent) event;
+ FlowEntryDistributionOrder fe = work.getFe();
+ if (fe != null) {
+ logsync.trace("Executing the workOrder {}", fe);
+ Status gotStatus = null;
+ FlowEntryInstall feiCurrent = fe.getEntry();
+ FlowEntryInstall feiNew = workOrder.get(fe.getEntry());
+ switch (fe.getUpType()) {
+ case ADDED:
+ /*
+ * TODO: Not still sure how to handle the
+ * sync entries
+ */
+ gotStatus = addEntriesInternal(feiCurrent, true);
+ break;
+ case CHANGED:
+ gotStatus = modifyEntryInternal(feiCurrent, feiNew, true);
+ break;
+ case REMOVED:
+ gotStatus = removeEntryInternal(feiCurrent, true);
+ break;
+ }
+ // Remove the Order
+ workOrder.remove(fe);
+ logsync.trace(
+ "The workOrder has been executed and now the status is being returned {}", fe);
+ // Place the status
+ workStatus.put(fe, gotStatus);
+ } else {
+ log.warn("Not expected null WorkOrder", work);
+ }
+ } else if (event instanceof WorkStatusCleanup) {
+ /*
+ * Take care of handling the remote Work request
+ */
+ WorkStatusCleanup work = (WorkStatusCleanup) event;
+ FlowEntryDistributionOrder fe = work.getFe();
+ if (fe != null) {
+ logsync.trace("The workStatus {} is being removed", fe);
+ workStatus.remove(fe);
+ } else {
+ log.warn("Not expected null WorkStatus", work);
+ }
} else {
- log.warn("Dequeued unknown event {}", event.getClass().getSimpleName());
+ log.warn("Dequeued unknown event {}", event.getClass()
+ .getSimpleName());
}
} catch (InterruptedException e) {
- log.warn("FRM EventHandler thread interrupted", e);
+ // clear pending events
+ pendingEvents.clear();
}
}
}
*
*/
void destroy() {
+ // Interrupt the thread
+ frmEventHandler.interrupt();
+ // Clear the pendingEvents queue
+ pendingEvents.clear();
+ frmAware.clear();
+ workMonitor.clear();
}
/**
// Initialize graceful stop flag
stopping = false;
+ // Allocate the executor service
+ this.executor = Executors.newSingleThreadExecutor();
+
// Start event handler thread
frmEventHandler.start();
*/
void stop() {
stopping = true;
- uninstallAllFlowEntries();
+ uninstallAllFlowEntries(false);
+ // Shutdown executor
+ this.executor.shutdownNow();
}
public void setFlowProgrammerService(IFlowProgrammerService service) {
}
@Override
- public void nodeConnectorUpdated(String containerName, NodeConnector p, UpdateType t) {
+ public void nodeConnectorUpdated(String containerName, NodeConnector nc, UpdateType t) {
if (!container.getName().equals(containerName)) {
return;
}
+
+ boolean updateStaticFlowCluster = false;
+
+ switch (t) {
+ case REMOVED:
+
+ List<FlowEntryInstall> nodeFlowEntries = nodeFlows.get(nc.getNode());
+ if (nodeFlowEntries == null) {
+ return;
+ }
+ for (FlowEntryInstall fei : new ArrayList<FlowEntryInstall>(nodeFlowEntries)) {
+ if (doesFlowContainNodeConnector(fei.getInstall().getFlow(), nc)) {
+ Status status = this.removeEntryInternal(fei, true);
+ if (!status.isSuccess()) {
+ continue;
+ }
+ /*
+ * If the flow entry is a static flow, then update its
+ * configuration
+ */
+ if (fei.getGroupName().equals(FlowConfig.STATICFLOWGROUP)) {
+ FlowConfig flowConfig = getStaticFlow(fei.getFlowName(), fei.getNode());
+ if (flowConfig != null) {
+ flowConfig.setStatus(PORTREMOVED);
+ updateStaticFlowCluster = true;
+ }
+ }
+ }
+ }
+ if (updateStaticFlowCluster) {
+ refreshClusterStaticFlowsStatus(nc.getNode());
+ }
+ break;
+ case ADDED:
+ List<FlowConfig> flowConfigForNode = getStaticFlows(nc.getNode());
+ for (FlowConfig flowConfig : flowConfigForNode) {
+ if (doesFlowContainNodeConnector(flowConfig.getFlow(), nc)) {
+ if (flowConfig.installInHw()) {
+ Status status = this.installFlowEntry(flowConfig.getFlowEntry());
+ if (!status.isSuccess()) {
+ flowConfig.setStatus(status.getDescription());
+ } else {
+ flowConfig.setStatus(SUCCESS);
+ }
+ updateStaticFlowCluster = true;
+ }
+ }
+ }
+ if (updateStaticFlowCluster) {
+ refreshClusterStaticFlowsStatus(nc.getNode());
+ }
+ break;
+ case CHANGED:
+ break;
+ default:
+ }
}
@Override
}
switch (update) {
case ADDED:
+ /*
+ * Controller is moving to container mode. We are in the default
+ * container context, we need to remove all our non-internal flows
+ * to prevent any container isolation breakage. We also need to
+ * preserve our flow so that they can be re-installed if we move
+ * back to non container mode (no containers).
+ */
this.inContainerMode = true;
- this.uninstallAllFlowEntries();
+ this.uninstallAllFlowEntries(true);
break;
case REMOVED:
this.inContainerMode = false;
}
}
+ private class WorkOrderEvent extends FRMEvent {
+ private FlowEntryDistributionOrder fe;
+ private FlowEntryInstall newEntry;
+
+ /**
+ * @param fe
+ * @param newEntry
+ */
+ WorkOrderEvent(FlowEntryDistributionOrder fe, FlowEntryInstall newEntry) {
+ this.fe = fe;
+ this.newEntry = newEntry;
+ }
+
+ /**
+ * @return the fe
+ */
+ public FlowEntryDistributionOrder getFe() {
+ return fe;
+ }
+
+ /**
+ * @return the newEntry
+ */
+ public FlowEntryInstall getNewEntry() {
+ return newEntry;
+ }
+ }
+
+ private class WorkStatusCleanup extends FRMEvent {
+ private FlowEntryDistributionOrder fe;
+
+ /**
+ * @param fe
+ */
+ WorkStatusCleanup(FlowEntryDistributionOrder fe) {
+ this.fe = fe;
+ }
+
+ /**
+ * @return the fe
+ */
+ public FlowEntryDistributionOrder getFe() {
+ return fe;
+ }
+ }
+
/*
* OSGI COMMANDS
*/
return rv;
}
+
+ public void unsetIConnectionManager(IConnectionManager s) {
+ if (s == this.connectionManager) {
+ this.connectionManager = null;
+ }
+ }
+
+ public void setIConnectionManager(IConnectionManager s) {
+ this.connectionManager = s;
+ }
+
+ @Override
+ public void entryCreated(Object key, String cacheName, boolean originLocal) {
+ /*
+ * Do nothing
+ */
+ }
+
+ @Override
+ public void entryUpdated(Object key, Object new_value, String cacheName, boolean originLocal) {
+ if (originLocal) {
+ /*
+ * Local updates are of no interest
+ */
+ return;
+ }
+ if (cacheName.equals(WORKORDERCACHE)) {
+ logsync.trace("Got a WorkOrderCacheUpdate for {}", key);
+ /*
+ * This is the case of one workOrder becoming available, so we need
+ * to dispatch the work to the appropriate handler
+ */
+ FlowEntryDistributionOrder fe = (FlowEntryDistributionOrder) key;
+ FlowEntryInstall fei = fe.getEntry();
+ if (fei == null) {
+ return;
+ }
+ Node n = fei.getNode();
+ if (connectionManager.isLocal(n)) {
+ logsync.trace("workOrder for fe {} processed locally", fe);
+ // I'm the controller in charge for the request, queue it for
+ // processing
+ pendingEvents.offer(new WorkOrderEvent(fe, (FlowEntryInstall) new_value));
+ }
+ } else if (cacheName.equals(WORKSTATUSCACHE)) {
+ logsync.trace("Got a WorkStatusCacheUpdate for {}", key);
+ /*
+ * This is the case of one workOrder being completed and a status
+ * returned
+ */
+ FlowEntryDistributionOrder fe = (FlowEntryDistributionOrder) key;
+ /*
+ * Check if the order was initiated by this controller in that case
+ * we need to actually look at the status returned
+ */
+ if (fe.getRequestorController()
+ .equals(clusterContainerService.getMyAddress())) {
+ FlowEntryDistributionOrderFutureTask fet = workMonitor.get(fe);
+ if (fet != null) {
+ logsync.trace("workStatus response is for us {}", fe);
+ // Signal we got the status
+ fet.gotStatus(fe, workStatus.get(fe));
+ pendingEvents.offer(new WorkStatusCleanup(fe));
+ }
+ }
+ }
+ }
+
+ @Override
+ public void entryDeleted(Object key, String cacheName, boolean originLocal) {
+ /*
+ * Do nothing
+ */
+ }
}