import org.opendaylight.controller.clustering.services.IClusterContainerServices;
import org.opendaylight.controller.clustering.services.IClusterServices;
import org.opendaylight.controller.configuration.IConfigurationContainerAware;
-import org.opendaylight.controller.sal.connection.ConnectionLocality;
import org.opendaylight.controller.connectionmanager.IConnectionManager;
+import org.opendaylight.controller.containermanager.IContainerManager;
import org.opendaylight.controller.forwardingrulesmanager.FlowConfig;
import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
import org.opendaylight.controller.forwardingrulesmanager.FlowEntryInstall;
import org.opendaylight.controller.sal.action.Flood;
import org.opendaylight.controller.sal.action.Output;
import org.opendaylight.controller.sal.action.PopVlan;
+import org.opendaylight.controller.sal.connection.ConnectionLocality;
import org.opendaylight.controller.sal.core.Config;
import org.opendaylight.controller.sal.core.ContainerFlow;
import org.opendaylight.controller.sal.core.IContainer;
-import org.opendaylight.controller.sal.core.IContainerListener;
+import org.opendaylight.controller.sal.core.IContainerLocalListener;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.core.NodeConnector;
import org.opendaylight.controller.sal.core.Property;
public class ForwardingRulesManager implements
IForwardingRulesManager,
PortGroupChangeListener,
- IContainerListener,
+ IContainerLocalListener,
ISwitchManagerAware,
IConfigurationContainerAware,
IInventoryListener,
private ConcurrentMap<String, PortGroupConfig> portGroupConfigs;
private ConcurrentMap<PortGroupConfig, Map<Node, PortGroup>> portGroupData;
private ConcurrentMap<String, Object> TSPolicies;
+ private IContainerManager containerManager;
private boolean inContainerMode; // being used by global instance only
- private boolean stopping;
+ protected boolean stopping;
/*
* Flow database. It's the software view of what was requested to install
* not picked by anyone, which is always a case can happen especially on
* Node disconnect cases.
*/
- private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
+ protected ConcurrentMap<FlowEntryDistributionOrder, FlowEntryInstall> workOrder;
/*
* Data structure responsible for retrieving the results of the workOrder
* TODO: The workStatus entries need to have a lifetime associated in case
* of requestor controller leaving the cluster.
*/
- private ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
+ protected ConcurrentMap<FlowEntryDistributionOrder, Status> workStatus;
/*
* Local Map used to hold the Future which a caller can use to monitor for
private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask> workMonitor =
new ConcurrentHashMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask>();
+ /*
+ * Max pool size for the executor
+ */
+ private static final int maxPoolSize = 10;
+
/**
* @param e
* Entry being installed/updated/removed
* merged flow may conflict with an existing old container flows merged flow
* on the network node
*/
- private void updateFlowsContainerFlow() {
+ protected void updateFlowsContainerFlow() {
Set<FlowEntry> toReInstall = new HashSet<FlowEntry>();
// First remove all installed entries
for (ConcurrentMap.Entry<FlowEntryInstall, FlowEntryInstall> entry : installedSwView.entrySet()) {
clusterContainerService.createCache("frm.staticFlows",
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- clusterContainerService.createCache("frm.flowsSaveEvent",
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-
clusterContainerService.createCache("frm.staticFlowsOrdinal",
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache(WORKSTATUSCACHE,
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
clusterContainerService.createCache(WORKORDERCACHE,
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL, IClusterServices.cacheMode.ASYNC));
} catch (CacheConfigException cce) {
log.error("CacheConfigException");
public void run() {
while (!stopping) {
try {
- FRMEvent event = pendingEvents.take();
+ final FRMEvent event = pendingEvents.take();
if (event == null) {
log.warn("Dequeued null event");
continue;
}
+ log.trace("Dequeued {} event", event.getClass().getSimpleName());
if (event instanceof NodeUpdateEvent) {
NodeUpdateEvent update = (NodeUpdateEvent) event;
Node node = update.getNode();
/*
* Take care of handling the remote Work request
*/
- WorkOrderEvent work = (WorkOrderEvent) event;
- FlowEntryDistributionOrder fe = work.getFe();
- if (fe != null) {
- logsync.trace("Executing the workOrder {}", fe);
- Status gotStatus = null;
- FlowEntryInstall feiCurrent = fe.getEntry();
- FlowEntryInstall feiNew = workOrder.get(fe.getEntry());
- switch (fe.getUpType()) {
- case ADDED:
- /*
- * TODO: Not still sure how to handle the
- * sync entries
- */
- gotStatus = addEntriesInternal(feiCurrent, true);
- break;
- case CHANGED:
- gotStatus = modifyEntryInternal(feiCurrent, feiNew, true);
- break;
- case REMOVED:
- gotStatus = removeEntryInternal(feiCurrent, true);
- break;
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ WorkOrderEvent work = (WorkOrderEvent) event;
+ FlowEntryDistributionOrder fe = work.getFe();
+ if (fe != null) {
+ logsync.trace("Executing the workOrder {}", fe);
+ Status gotStatus = null;
+ FlowEntryInstall feiCurrent = fe.getEntry();
+ FlowEntryInstall feiNew = workOrder.get(fe);
+ switch (fe.getUpType()) {
+ case ADDED:
+ gotStatus = addEntriesInternal(feiCurrent, false);
+ break;
+ case CHANGED:
+ gotStatus = modifyEntryInternal(feiCurrent, feiNew, false);
+ break;
+ case REMOVED:
+ gotStatus = removeEntryInternal(feiCurrent, false);
+ break;
+ }
+ // Remove the Order
+ workOrder.remove(fe);
+ logsync.trace(
+ "The workOrder has been executed and now the status is being returned {}", fe);
+ // Place the status
+ workStatus.put(fe, gotStatus);
+ } else {
+ log.warn("Not expected null WorkOrder", work);
+ }
}
- // Remove the Order
- workOrder.remove(fe);
- logsync.trace(
- "The workOrder has been executed and now the status is being returned {}", fe);
- // Place the status
- workStatus.put(fe, gotStatus);
- } else {
- log.warn("Not expected null WorkOrder", work);
+ };
+ if(executor != null) {
+ executor.execute(r);
}
} else if (event instanceof WorkStatusCleanup) {
/*
*
*/
void start() {
+ /*
+ * If running in default container, need to know if controller is in
+ * container mode
+ */
+ if (GlobalConstants.DEFAULT.toString().equals(this.getContainerName())) {
+ inContainerMode = containerManager.inContainerMode();
+ }
+
// Initialize graceful stop flag
stopping = false;
// Allocate the executor service
- this.executor = Executors.newSingleThreadExecutor();
+ this.executor = Executors.newFixedThreadPool(maxPoolSize);
// Start event handler thread
frmEventHandler.start();
this.connectionManager = s;
}
+ public void unsetIContainerManager(IContainerManager s) {
+ if (s == this.containerManager) {
+ this.containerManager = null;
+ }
+ }
+
+ public void setIContainerManager(IContainerManager s) {
+ this.containerManager = s;
+ }
+
@Override
public void entryCreated(Object key, String cacheName, boolean originLocal) {
/*