import org.opendaylight.controller.clustering.services.IClusterServices;
import org.opendaylight.controller.configuration.IConfigurationContainerAware;
import org.opendaylight.controller.connectionmanager.IConnectionManager;
+import org.opendaylight.controller.containermanager.IContainerManager;
import org.opendaylight.controller.forwardingrulesmanager.FlowConfig;
import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
import org.opendaylight.controller.forwardingrulesmanager.FlowEntryInstall;
private ConcurrentMap<String, PortGroupConfig> portGroupConfigs;
private ConcurrentMap<PortGroupConfig, Map<Node, PortGroup>> portGroupData;
private ConcurrentMap<String, Object> TSPolicies;
+ private IContainerManager containerManager;
private boolean inContainerMode; // being used by global instance only
protected boolean stopping;
private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask> workMonitor =
new ConcurrentHashMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask>();
+ /*
+ * Max pool size for the executor
+ */
+ private static final int maxPoolSize = 10;
+
/**
* @param e
* Entry being installed/updated/removed
public void run() {
while (!stopping) {
try {
- FRMEvent event = pendingEvents.take();
+ final FRMEvent event = pendingEvents.take();
if (event == null) {
log.warn("Dequeued null event");
continue;
/*
* Take care of handling the remote Work request
*/
- WorkOrderEvent work = (WorkOrderEvent) event;
- FlowEntryDistributionOrder fe = work.getFe();
- if (fe != null) {
- logsync.trace("Executing the workOrder {}", fe);
- Status gotStatus = null;
- FlowEntryInstall feiCurrent = fe.getEntry();
- FlowEntryInstall feiNew = workOrder.get(fe);
- switch (fe.getUpType()) {
- case ADDED:
- /*
- * TODO: Not still sure how to handle the
- * sync entries
- */
- gotStatus = addEntriesInternal(feiCurrent, true);
- break;
- case CHANGED:
- gotStatus = modifyEntryInternal(feiCurrent, feiNew, true);
- break;
- case REMOVED:
- gotStatus = removeEntryInternal(feiCurrent, true);
- break;
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ WorkOrderEvent work = (WorkOrderEvent) event;
+ FlowEntryDistributionOrder fe = work.getFe();
+ if (fe != null) {
+ logsync.trace("Executing the workOrder {}", fe);
+ Status gotStatus = null;
+ FlowEntryInstall feiCurrent = fe.getEntry();
+ FlowEntryInstall feiNew = workOrder.get(fe);
+ switch (fe.getUpType()) {
+ case ADDED:
+ gotStatus = addEntriesInternal(feiCurrent, false);
+ break;
+ case CHANGED:
+ gotStatus = modifyEntryInternal(feiCurrent, feiNew, false);
+ break;
+ case REMOVED:
+ gotStatus = removeEntryInternal(feiCurrent, false);
+ break;
+ }
+ // Remove the Order
+ workOrder.remove(fe);
+ logsync.trace(
+ "The workOrder has been executed and now the status is being returned {}", fe);
+ // Place the status
+ workStatus.put(fe, gotStatus);
+ } else {
+ log.warn("Not expected null WorkOrder", work);
+ }
}
- // Remove the Order
- workOrder.remove(fe);
- logsync.trace(
- "The workOrder has been executed and now the status is being returned {}", fe);
- // Place the status
- workStatus.put(fe, gotStatus);
- } else {
- log.warn("Not expected null WorkOrder", work);
+ };
+ if(executor != null) {
+ executor.execute(r);
}
} else if (event instanceof WorkStatusCleanup) {
/*
*
*/
void start() {
+ /*
+ * If running in default container, need to know if controller is in
+ * container mode
+ */
+ if (GlobalConstants.DEFAULT.toString().equals(this.getContainerName())) {
+ inContainerMode = containerManager.inContainerMode();
+ }
+
// Initialize graceful stop flag
stopping = false;
// Allocate the executor service
- this.executor = Executors.newSingleThreadExecutor();
+ this.executor = Executors.newFixedThreadPool(maxPoolSize);
// Start event handler thread
frmEventHandler.start();
this.connectionManager = s;
}
+ public void unsetIContainerManager(IContainerManager s) {
+ if (s == this.containerManager) {
+ this.containerManager = null;
+ }
+ }
+
+ public void setIContainerManager(IContainerManager s) {
+ this.containerManager = s;
+ }
+
@Override
public void entryCreated(Object key, String cacheName, boolean originLocal) {
/*