import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.eclipse.osgi.framework.console.CommandInterpreter;
import org.opendaylight.controller.clustering.services.IClusterContainerServices;
import org.opendaylight.controller.clustering.services.IClusterServices;
import org.opendaylight.controller.configuration.IConfigurationContainerAware;
-import org.opendaylight.controller.connectionmanager.ConnectionLocality;
import org.opendaylight.controller.connectionmanager.IConnectionManager;
+import org.opendaylight.controller.containermanager.IContainerManager;
import org.opendaylight.controller.forwardingrulesmanager.FlowConfig;
import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
import org.opendaylight.controller.forwardingrulesmanager.FlowEntryInstall;
import org.opendaylight.controller.sal.action.Flood;
import org.opendaylight.controller.sal.action.Output;
import org.opendaylight.controller.sal.action.PopVlan;
+import org.opendaylight.controller.sal.connection.ConnectionLocality;
import org.opendaylight.controller.sal.core.Config;
import org.opendaylight.controller.sal.core.ContainerFlow;
import org.opendaylight.controller.sal.core.IContainer;
-import org.opendaylight.controller.sal.core.IContainerListener;
+import org.opendaylight.controller.sal.core.IContainerLocalListener;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.core.NodeConnector;
import org.opendaylight.controller.sal.core.Property;
public class ForwardingRulesManager implements
IForwardingRulesManager,
PortGroupChangeListener,
- IContainerListener,
+ IContainerLocalListener,
ISwitchManagerAware,
IConfigurationContainerAware,
IInventoryListener,
private ConcurrentMap<String, PortGroupConfig> portGroupConfigs;
private ConcurrentMap<PortGroupConfig, Map<Node, PortGroup>> portGroupData;
private ConcurrentMap<String, Object> TSPolicies;
+ private IContainerManager containerManager;
private boolean inContainerMode; // being used by global instance only
- private boolean stopping;
+ protected boolean stopping;
/*
* Flow database. It's the software view of what was requested to install
* not picked by anyone, which is always a case can happen especially on
* Node disconnect cases.
*/
- private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
+ protected ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
/*
* Data structure responsible for retrieving the results of the workOrder
* TODO: The workStatus entries need to have a lifetime associated in case
* of requestor controller leaving the cluster.
*/
- private ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
+ protected ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
/*
* Local Map used to hold the Future which a caller can use to monitor for
private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask> workMonitor =
new ConcurrentHashMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask>();
+ /*
+ * Max pool size for the executor
+ */
+ private static final int maxPoolSize = 10;
+
/**
* @param e
* Entry being installed/updated/removed
* @return a Future object for monitoring the progress of the result, or
* null in case the processing should take place locally
*/
- private Future<Status> distributeWorkOrder(FlowEntryInstall e, FlowEntryInstall u, UpdateType t) {
+ private FlowEntryDistributionOrderFutureTask distributeWorkOrder(FlowEntryInstall e, FlowEntryInstall u,
+ UpdateType t) {
// A null entry it's an unexpected condition, anyway it's safe to keep
// the handling local
if (e == null) {
succeded = ret;
} else {
error = ret;
- log.warn("Failed to install the entry: {}. The failure is: {}", installEntry, ret.getDescription());
+ log.trace("Failed to install the entry: {}. The failure is: {}", installEntry, ret.getDescription());
}
}
* contain the unique id assigned to this request
*/
private Status modifyEntryInternal(FlowEntryInstall currentEntries, FlowEntryInstall newEntries, boolean async) {
- Future<Status> futureStatus = distributeWorkOrder(currentEntries, newEntries, UpdateType.CHANGED);
+ FlowEntryDistributionOrderFutureTask futureStatus =
+ distributeWorkOrder(currentEntries, newEntries, UpdateType.CHANGED);
if (futureStatus != null) {
Status retStatus = new Status(StatusCode.UNDEFINED);
try {
retStatus = futureStatus.get();
+ if (retStatus.getCode()
+ .equals(StatusCode.TIMEOUT)) {
+ // A timeout happened, lets cleanup the workMonitor
+ workMonitor.remove(futureStatus.getOrder());
+ }
} catch (InterruptedException e) {
log.error("", e);
} catch (ExecutionException e) {
.getFlow());
if (!status.isSuccess()) {
- log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", newEntries.getInstall(),
+ log.trace("SDN Plugin failed to program the flow: {}. The failure is: {}", newEntries.getInstall(),
status.getDescription());
return status;
}
if (!ret.isSuccess()) {
error = ret;
- log.warn("Failed to remove the entry: {}. The failure is: {}", entry.getInstall(), ret.getDescription());
+ log.trace("Failed to remove the entry: {}. The failure is: {}", entry.getInstall(), ret.getDescription());
if (installedList.size() == 1) {
// If we had only one entry to remove, this is fatal failure
return error;
* contain the unique id assigned to this request
*/
private Status removeEntryInternal(FlowEntryInstall entry, boolean async) {
- Future<Status> futureStatus = distributeWorkOrder(entry, null, UpdateType.REMOVED);
+ FlowEntryDistributionOrderFutureTask futureStatus = distributeWorkOrder(entry, null, UpdateType.REMOVED);
if (futureStatus != null) {
Status retStatus = new Status(StatusCode.UNDEFINED);
try {
retStatus = futureStatus.get();
+ if (retStatus.getCode()
+ .equals(StatusCode.TIMEOUT)) {
+ // A timeout happened, lets cleanup the workMonitor
+ workMonitor.remove(futureStatus.getOrder());
+ }
} catch (InterruptedException e) {
log.error("", e);
} catch (ExecutionException e) {
.getFlow());
if (!status.isSuccess()) {
- log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(),
+ log.trace("SDN Plugin failed to remove the flow: {}. The failure is: {}", entry.getInstall(),
status.getDescription());
return status;
}
* contain the unique id assigned to this request
*/
private Status addEntriesInternal(FlowEntryInstall entry, boolean async) {
- Future<Status> futureStatus = distributeWorkOrder(entry, null, UpdateType.ADDED);
+ FlowEntryDistributionOrderFutureTask futureStatus = distributeWorkOrder(entry, null, UpdateType.ADDED);
if (futureStatus != null) {
Status retStatus = new Status(StatusCode.UNDEFINED);
try {
retStatus = futureStatus.get();
+ if (retStatus.getCode()
+ .equals(StatusCode.TIMEOUT)) {
+ // A timeout happened, lets cleanup the workMonitor
+ workMonitor.remove(futureStatus.getOrder());
+ }
} catch (InterruptedException e) {
log.error("", e);
} catch (ExecutionException e) {
.getFlow());
if (!status.isSuccess()) {
- log.warn("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(),
+ log.trace("SDN Plugin failed to program the flow: {}. The failure is: {}", entry.getInstall(),
status.getDescription());
return status;
}
return true;
}
+ private ConcurrentMap.Entry<Integer, FlowConfig> getStaticFlowEntry(String name, Node node) {
+ for (ConcurrentMap.Entry<Integer, FlowConfig> flowEntry : staticFlows.entrySet()) {
+ FlowConfig flowConfig = flowEntry.getValue();
+ if (flowConfig.isByNameAndNodeIdEqual(name, node)) {
+ return flowEntry;
+ }
+ }
+ return null;
+ }
+
private void updateLocalDatabase(FlowEntryInstall entry, boolean add) {
// Update the software view
updateSwViewes(entry, add);
updateLocalDatabase(target, false);
} else {
// log the error
- log.warn("SDN Plugin failed to remove the flow: {}. The failure is: {}", target.getInstall(),
+ log.trace("SDN Plugin failed to remove the flow: {}. The failure is: {}", target.getInstall(),
status.getDescription());
}
* merged flow may conflict with an existing old container flows merged flow
* on the network node
*/
- private void updateFlowsContainerFlow() {
+ protected void updateFlowsContainerFlow() {
Set<FlowEntry> toReInstall = new HashSet<FlowEntry>();
// First remove all installed entries
for (ConcurrentMap.Entry<FlowEntryInstall, FlowEntryInstall> entry : installedSwView.entrySet()) {
clusterContainerService.createCache("frm.staticFlows",
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache("frm.flowsSaveEvent",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
clusterContainerService.createCache("frm.staticFlowsOrdinal",
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache(WORKSTATUSCACHE,
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
clusterContainerService.createCache(WORKORDERCACHE,
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
} catch (CacheConfigException cce) {
log.error("CacheConfigException");
* If requested, a copy of each original flow entry will be stored in the
* inactive list so that it can be re-applied when needed (This is typically
* the case when running in the default container and controller moved to
- * container mode)
+ * container mode) NOTE WELL: The routine as long as does a bulk change will
+ * operate only on the entries for nodes locally attached so to avoid
+ * redundant operations initiated by multiple nodes
*
* @param preserveFlowEntries
* if true, a copy of each original entry is stored in the
// Now remove the entries
for (FlowEntryInstall flowEntryHw : toRemove) {
- Status status = this.removeEntryInternal(flowEntryHw, false);
- if (!status.isSuccess()) {
- log.warn("Failed to remove entry: {}. The failure is: {}", flowEntryHw, status.getDescription());
+ Node n = flowEntryHw.getNode();
+ if (n != null && connectionManager.getLocalityStatus(n) == ConnectionLocality.LOCAL) {
+ Status status = this.removeEntryInternal(flowEntryHw, false);
+ if (!status.isSuccess()) {
+ log.trace("Failed to remove entry: {}. The failure is: {}", flowEntryHw, status.getDescription());
+ }
+ } else {
+ log.debug("Not removing entry {} because not connected locally, the remote guy will do it's job",
+ flowEntryHw);
}
}
}
@Override
public FlowConfig getStaticFlow(String name, Node node) {
- for (ConcurrentMap.Entry<Integer, FlowConfig> entry : staticFlows.entrySet()) {
- if (entry.getValue().isByNameAndNodeIdEqual(name, node)) {
- return entry.getValue();
- }
+ ConcurrentMap.Entry<Integer, FlowConfig> entry = getStaticFlowEntry(name, node);
+ if(entry != null) {
+ return entry.getValue();
}
return null;
}
public void run() {
while (!stopping) {
try {
- FRMEvent event = pendingEvents.take();
+ final FRMEvent event = pendingEvents.take();
if (event == null) {
log.warn("Dequeued null event");
continue;
}
+ log.trace("Dequeued {} event", event.getClass().getSimpleName());
if (event instanceof NodeUpdateEvent) {
NodeUpdateEvent update = (NodeUpdateEvent) event;
Node node = update.getNode();
/*
* Take care of handling the remote Work request
*/
- WorkOrderEvent work = (WorkOrderEvent) event;
- FlowEntryDistributionOrder fe = work.getFe();
- if (fe != null) {
- logsync.trace("Executing the workOrder {}", fe);
- Status gotStatus = null;
- FlowEntryInstall feiCurrent = fe.getEntry();
- FlowEntryInstall feiNew = workOrder.get(fe.getEntry());
- switch (fe.getUpType()) {
- case ADDED:
- /*
- * TODO: Not still sure how to handle the
- * sync entries
- */
- gotStatus = addEntriesInternal(feiCurrent, true);
- break;
- case CHANGED:
- gotStatus = modifyEntryInternal(feiCurrent, feiNew, true);
- break;
- case REMOVED:
- gotStatus = removeEntryInternal(feiCurrent, true);
- break;
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ WorkOrderEvent work = (WorkOrderEvent) event;
+ FlowEntryDistributionOrder fe = work.getFe();
+ if (fe != null) {
+ logsync.trace("Executing the workOrder {}", fe);
+ Status gotStatus = null;
+ FlowEntryInstall feiCurrent = fe.getEntry();
+ FlowEntryInstall feiNew = workOrder.get(fe);
+ switch (fe.getUpType()) {
+ case ADDED:
+ gotStatus = addEntriesInternal(feiCurrent, false);
+ break;
+ case CHANGED:
+ gotStatus = modifyEntryInternal(feiCurrent, feiNew, false);
+ break;
+ case REMOVED:
+ gotStatus = removeEntryInternal(feiCurrent, false);
+ break;
+ }
+ // Remove the Order
+ workOrder.remove(fe);
+ logsync.trace(
+ "The workOrder has been executed and now the status is being returned {}", fe);
+ // Place the status
+ workStatus.put(fe, gotStatus);
+ } else {
+ log.warn("Not expected null WorkOrder", work);
+ }
}
- // Remove the Order
- workOrder.remove(fe);
- logsync.trace(
- "The workOrder has been executed and now the status is being returned {}", fe);
- // Place the status
- workStatus.put(fe, gotStatus);
- } else {
- log.warn("Not expected null WorkOrder", work);
+ };
+ if(executor != null) {
+ executor.execute(r);
}
} else if (event instanceof WorkStatusCleanup) {
/*
*
*/
void start() {
+ /*
+ * If running in default container, need to know if controller is in
+ * container mode
+ */
+ if (GlobalConstants.DEFAULT.toString().equals(this.getContainerName())) {
+ inContainerMode = containerManager.inContainerMode();
+ }
+
// Initialize graceful stop flag
stopping = false;
// Allocate the executor service
- this.executor = Executors.newSingleThreadExecutor();
+ this.executor = Executors.newFixedThreadPool(maxPoolSize);
// Start event handler thread
frmEventHandler.start();
uninstallAllFlowEntries(false);
// Shutdown executor
this.executor.shutdownNow();
+ // Now walk all the workMonitor and wake up the one sleeping because
+ // destruction is happening
+ for (FlowEntryDistributionOrder fe : workMonitor.keySet()) {
+ FlowEntryDistributionOrderFutureTask task = workMonitor.get(fe);
+ task.cancel(true);
+ }
}
public void setFlowProgrammerService(IFlowProgrammerService service) {
}
}
+ public void _frmProcessErrorEvent(CommandInterpreter ci) throws UnknownHostException {
+ Node node = null;
+ long reqId = 0L;
+ String nodeId = ci.nextArgument();
+ if (nodeId == null) {
+ ci.print("Node id not specified");
+ return;
+ }
+ String requestId = ci.nextArgument();
+ if (requestId == null) {
+ ci.print("Request id not specified");
+ return;
+ }
+ try {
+ node = NodeCreator.createOFNode(Long.valueOf(nodeId));
+ } catch (NumberFormatException e) {
+ ci.print("Node id not a number");
+ return;
+ }
+ try {
+ reqId = Long.parseLong(requestId);
+ } catch (NumberFormatException e) {
+ ci.print("Request id not a number");
+ return;
+ }
+ // null for error object is good enough for now
+ ErrorReportedEvent event = new ErrorReportedEvent(reqId, node, null);
+ this.processErrorEvent(event);
+ }
+
@Override
public void flowRemoved(Node node, Flow flow) {
log.trace("Received flow removed notification on {} for {}", node, flow);
* mapping will have to be added in future
*/
FlowEntryInstall target = null;
- for (FlowEntryInstall index : nodeFlows.get(node)) {
- FlowEntryInstall entry = installedSwView.get(index);
- if (entry.getRequestId() == rid) {
- target = entry;
- break;
+ List<FlowEntryInstall> flowEntryInstallList = nodeFlows.get(node);
+ // flowEntryInstallList could be null.
+ // so check for it.
+ if(flowEntryInstallList != null) {
+ for (FlowEntryInstall index : flowEntryInstallList) {
+ FlowEntryInstall entry = installedSwView.get(index);
+ if(entry != null) {
+ if (entry.getRequestId() == rid) {
+ target = entry;
+ break;
+ }
+ }
}
}
if (target != null) {
// This was a flow install, update database
this.updateLocalDatabase(target, false);
+ // also update the config
+ if(FlowConfig.STATICFLOWGROUP.equals(target.getGroupName())) {
+ ConcurrentMap.Entry<Integer, FlowConfig> staticFlowEntry = getStaticFlowEntry(target.getFlowName(),target.getNode());
+ // staticFlowEntry should never be null.
+ // the null check is just an extra defensive check.
+ if(staticFlowEntry != null) {
+ staticFlows.remove(staticFlowEntry.getKey());
+ }
+ }
}
// Notify listeners
this.connectionManager = s;
}
+ public void unsetIContainerManager(IContainerManager s) {
+ if (s == this.containerManager) {
+ this.containerManager = null;
+ }
+ }
+
+ public void setIContainerManager(IContainerManager s) {
+ this.containerManager = s;
+ }
+
@Override
public void entryCreated(Object key, String cacheName, boolean originLocal) {
/*
*/
if (fe.getRequestorController()
.equals(clusterContainerService.getMyAddress())) {
- FlowEntryDistributionOrderFutureTask fet = workMonitor.get(fe);
+ FlowEntryDistributionOrderFutureTask fet = workMonitor.remove(fe);
if (fet != null) {
logsync.trace("workStatus response is for us {}", fe);
// Signal we got the status