import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
private ConcurrentMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask> workMonitor =
new ConcurrentHashMap<FlowEntryDistributionOrder, FlowEntryDistributionOrderFutureTask>();
+ /*
+ * Create an executor pool to create the distributionOrder, this is a stop
+ * gap solution caused by an issue with non-transactional caches in the
+ * implementation we use, being currently worked on. It has been noticed in
+ * fact that when non-transactional caches are being used sometime the key
+ * are no distributed to all the nodes properly. To workaround the issue
+ * transactional caches are being used, but there was a reason for using
+ * non-transactional caches to start with, in fact we needed to be able in
+ * the context of a northbound transaction to program the FRM entries
+ * irrespective of the fact that transaction would commit or no else we
+ * would not be able to achieve the entry programming and implement the
+ * scheme for recovery from network element failures. Bottom line, now in
+ * order to make sure an update on a transactional cache goes out while in a
+ * transaction that need to be initiated by a different thread.
+ */
+ private ExecutorService executor;
+
+ class DistributeOrderCallable implements Callable<Future<Status>> {
+ private FlowEntryInstall e;
+ private FlowEntryInstall u;
+ private UpdateType t;
+ DistributeOrderCallable(FlowEntryInstall e, FlowEntryInstall u, UpdateType t) {
+ this.e = e;
+ this.u = u;
+ this.t = t;
+ }
+
+ @Override
+ public Future<Status> call() throws Exception {
+ if (e == null || t == null) {
+ logsync.error("Unexpected null Entry up update type");
+ return null;
+ }
+ // Create the work order and distribute it
+ FlowEntryDistributionOrder fe =
+ new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
+ // First create the monitor job
+ FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
+ logsync.trace("Node {} not local so sending fe {}", e.getNode(), fe);
+ workMonitor.put(fe, ret);
+ if (t.equals(UpdateType.CHANGED)) {
+ // Then distribute the work
+ workOrder.put(fe, u);
+ } else {
+ // Then distribute the work
+ workOrder.put(fe, e);
+ }
+ logsync.trace("WorkOrder requested");
+ // Now create an Handle to monitor the execution of the operation
+ return ret;
+ }
+ }
+
/**
* @param e
* Entry being installed/updated/removed
Node n = e.getNode();
if (!connectionManager.isLocal(n)) {
- // Create the work order and distribute it
- FlowEntryDistributionOrder fe =
- new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress());
- // First create the monitor job
- FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe);
- logsync.trace("Node {} not local so sending fe {}", n, fe);
- workMonitor.put(fe, ret);
- if (t.equals(UpdateType.CHANGED)) {
- // Then distribute the work
- workOrder.put(fe, u);
- } else {
- // Then distribute the work
- workOrder.put(fe, e);
+ Callable<Future<Status>> worker = new DistributeOrderCallable(e, u, t);
+ if (worker != null) {
+ Future<Future<Status>> workerRes = this.executor.submit(worker);
+ try {
+ return workerRes.get();
+ } catch (InterruptedException e1) {
+ // we where interrupted, not a big deal.
+ return null;
+ } catch (ExecutionException e1) {
+ logsync.error(
+ "We got an execution exception {} we cannot much, so returning we don't have nothing to wait for",
+ e);
+ return null;
+ }
}
- logsync.trace("WorkOrder requested");
- // Now create an Handle to monitor the execution of the operation
- return ret;
}
logsync.trace("LOCAL Node {} so processing Entry:{} UpdateType:{}", n, e, t);
-
return null;
}
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache(WORKSTATUSCACHE,
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
clusterContainerService.createCache(WORKORDERCACHE,
- EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
} catch (CacheConfigException cce) {
log.error("CacheConfigException");
// Initialize graceful stop flag
stopping = false;
+ // Allocate the executor service
+ this.executor = Executors.newSingleThreadExecutor();
+
// Start event handler thread
frmEventHandler.start();
void stop() {
stopping = true;
uninstallAllFlowEntries(false);
+ // Shutdown executor
+ this.executor.shutdownNow();
}
public void setFlowProgrammerService(IFlowProgrammerService service) {
import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
+import org.opendaylight.controller.sal.core.IContainerAware;
import org.opendaylight.controller.sal.core.IContainerListener;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService;
// by SAL
props.put(GlobalConstants.PROTOCOLPLUGINTYPE.toString(), Node.NodeIDType.OPENFLOW);
c.setInterface(
- new String[] {
- IPluginInFlowProgrammerService.class.getName(),
- IMessageListener.class.getName(),
- IContainerListener.class.getName(),
- IInventoryShimExternalListener.class.getName() },
- props);
+ new String[] { IPluginInFlowProgrammerService.class.getName(), IMessageListener.class.getName(),
+ IContainerListener.class.getName(), IInventoryShimExternalListener.class.getName(),
+ IContainerAware.class.getName() }, props);
c.add(createServiceDependency()
.setService(IController.class, "(name=Controller)")
if (imp.equals(ReadServiceFilter.class)) {
- c.setInterface(new String[] {
- IReadServiceFilter.class.getName(),
- IContainerListener.class.getName(),
- IOFStatisticsListener.class.getName() }, null);
+ c.setInterface(new String[] { IReadServiceFilter.class.getName(), IContainerListener.class.getName(),
+ IOFStatisticsListener.class.getName(), IContainerAware.class.getName() }, null);
c.add(createServiceDependency()
.setService(IController.class, "(name=Controller)")
if (imp.equals(DiscoveryService.class)) {
// export the service
c.setInterface(
- new String[] {
- IInventoryShimExternalListener.class.getName(),
- IDataPacketListen.class.getName(),
+ new String[] { IInventoryShimExternalListener.class.getName(), IDataPacketListen.class.getName(),
IContainerListener.class.getName() }, null);
c.add(createServiceDependency()
// DataPacket mux/demux services, which is teh actual engine
// doing the packet switching
if (imp.equals(DataPacketMuxDemux.class)) {
- c.setInterface(new String[] { IDataPacketMux.class.getName(),
- IContainerListener.class.getName(),
- IInventoryShimExternalListener.class.getName() }, null);
+ c.setInterface(new String[] { IDataPacketMux.class.getName(), IContainerListener.class.getName(),
+ IInventoryShimExternalListener.class.getName(), IContainerAware.class.getName() }, null);
c.add(createServiceDependency()
.setService(IController.class, "(name=Controller)")
if (imp.equals(InventoryServiceShim.class)) {
c.setInterface(new String[] { IContainerListener.class.getName(),
- IOFStatisticsListener.class.getName()}, null);
+ IOFStatisticsListener.class.getName(), IContainerAware.class.getName() }, null);
c.add(createServiceDependency()
.setService(IController.class, "(name=Controller)")
}
if (imp.equals(TopologyServiceShim.class)) {
- c.setInterface(new String[] { IDiscoveryListener.class.getName(),
- IContainerListener.class.getName(),
- IRefreshInternalProvider.class.getName(),
- IInventoryShimExternalListener.class.getName() }, null);
+ c.setInterface(new String[] { IDiscoveryListener.class.getName(), IContainerListener.class.getName(),
+ IRefreshInternalProvider.class.getName(), IInventoryShimExternalListener.class.getName(),
+ IContainerAware.class.getName() }, null);
c.add(createServiceDependency()
.setService(ITopologyServiceShimListener.class)
.setCallbacks("setTopologyServiceShimListener",
package org.opendaylight.controller.protocol_plugin.openflow.internal;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
-import org.openflow.protocol.OFMessage;
-import org.openflow.protocol.OFPacketIn;
-import org.openflow.protocol.OFPacketOut;
-import org.openflow.protocol.OFPort;
-import org.openflow.protocol.OFType;
-import org.openflow.protocol.action.OFAction;
-import org.openflow.protocol.action.OFActionOutput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
import org.opendaylight.controller.sal.core.ConstructionException;
import org.opendaylight.controller.sal.core.ContainerFlow;
+import org.opendaylight.controller.sal.core.IContainerAware;
import org.opendaylight.controller.sal.core.IContainerListener;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.core.NodeConnector;
import org.opendaylight.controller.sal.packet.RawPacket;
import org.opendaylight.controller.sal.utils.GlobalConstants;
import org.opendaylight.controller.sal.utils.HexEncode;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFPacketIn;
+import org.openflow.protocol.OFPacketOut;
+import org.openflow.protocol.OFPort;
+import org.openflow.protocol.OFType;
+import org.openflow.protocol.action.OFAction;
+import org.openflow.protocol.action.OFActionOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class DataPacketMuxDemux implements IContainerListener,
- IMessageListener, IDataPacketMux, IInventoryShimExternalListener {
+ IMessageListener, IDataPacketMux, IInventoryShimExternalListener, IContainerAware {
protected static final Logger logger = LoggerFactory
.getLogger(DataPacketMuxDemux.class);
private IController controller = null;
}
switch (t) {
case ADDED:
- if (!fSpecs.contains(previousFlow)) {
- fSpecs.add(previousFlow);
+ if (!fSpecs.contains(currentFlow)) {
+ fSpecs.add(currentFlow);
}
+ container2FlowSpecs.put(containerName, fSpecs);
break;
case REMOVED:
- if (fSpecs.contains(previousFlow)) {
- fSpecs.remove(previousFlow);
- }
+ fSpecs.remove(currentFlow);
break;
case CHANGED:
break;
UpdateType type, Set<Property> props) {
// do nothing
}
+
+ @Override
+ public void containerCreate(String containerName) {
+ // do nothing
+ }
+
+ @Override
+ public void containerDestroy(String containerName) {
+ Set<NodeConnector> removeNodeConnectorSet = new HashSet<NodeConnector>();
+ for (Map.Entry<NodeConnector, List<String>> entry : nc2Container.entrySet()) {
+ List<String> ncContainers = entry.getValue();
+ if (ncContainers.contains(containerName)) {
+ NodeConnector nodeConnector = entry.getKey();
+ removeNodeConnectorSet.add(nodeConnector);
+ }
+ }
+ for (NodeConnector nodeConnector : removeNodeConnectorSet) {
+ List<String> ncContainers = nc2Container.get(nodeConnector);
+ ncContainers.remove(containerName);
+ if (ncContainers.isEmpty()) {
+ nc2Container.remove(nodeConnector);
+ }
+ }
+ container2FlowSpecs.remove(containerName);
+ }
}
import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
import org.opendaylight.controller.sal.core.ContainerFlow;
+import org.opendaylight.controller.sal.core.IContainerAware;
import org.opendaylight.controller.sal.core.IContainerListener;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.core.Node.NodeIDType;
*/
public class FlowProgrammerService implements IPluginInFlowProgrammerService,
IMessageListener, IContainerListener, IInventoryShimExternalListener,
- CommandProvider {
+ CommandProvider, IContainerAware {
private static final Logger log = LoggerFactory
.getLogger(FlowProgrammerService.class);
private IController controller;
@Override
public void nodeConnectorUpdated(String containerName, NodeConnector p,
UpdateType type) {
- Set<NodeConnector> target = null;
-
switch (type) {
case ADDED:
if (!containerToNc.containsKey(containerName)) {
case CHANGED:
break;
case REMOVED:
- target = containerToNc.get(containerName);
+ Set<NodeConnector> target = containerToNc.get(containerName);
if (target != null) {
target.remove(p);
}
ci.println("Max num of async messages sent prior to the Barrier message is "
+ barrierMessagePriorCount);
}
+
+ @Override
+ public void containerCreate(String containerName) {
+ // do nothing
+ }
+
+ @Override
+ public void containerDestroy(String containerName) {
+ containerToNc.remove(containerName);
+ }
}
import org.opendaylight.controller.sal.core.Capabilities;
import org.opendaylight.controller.sal.core.ContainerFlow;
import org.opendaylight.controller.sal.core.Description;
+import org.opendaylight.controller.sal.core.IContainerAware;
import org.opendaylight.controller.sal.core.IContainerListener;
import org.opendaylight.controller.sal.core.MacAddress;
import org.opendaylight.controller.sal.core.Node;
*
*/
public class InventoryServiceShim implements IContainerListener,
- IMessageListener, ISwitchStateListener, IOFStatisticsListener {
+ IMessageListener, ISwitchStateListener, IOFStatisticsListener, IContainerAware {
protected static final Logger logger = LoggerFactory
.getLogger(InventoryServiceShim.class);
private IController controller = null;
}
void setInventoryShimExternalListener(IInventoryShimExternalListener s) {
- logger.trace("Set inventoryShimExternalListener");
+ logger.trace("Set inventoryShimExternalListener {}", s);
if ((this.inventoryShimExternalListeners != null)
&& !this.inventoryShimExternalListeners.contains(s)) {
this.inventoryShimExternalListeners.add(s);
}
void unsetInventoryShimExternalListener(IInventoryShimExternalListener s) {
+ logger.trace("Unset inventoryShimExternalListener {}", s);
if ((this.inventoryShimExternalListeners != null)
&& this.inventoryShimExternalListeners.contains(s)) {
this.inventoryShimExternalListeners.remove(s);
public void tableStatisticsRefreshed(Long switchId, List<OFStatistics> tables) {
// Nothing to do
}
+
+ @Override
+ public void containerCreate(String containerName) {
+ // Nothing to do
+ }
+
+ @Override
+ public void containerDestroy(String containerName) {
+ Set<NodeConnector> removeNodeConnectorSet = new HashSet<NodeConnector>();
+ Set<Node> removeNodeSet = new HashSet<Node>();
+ for (Map.Entry<NodeConnector, Set<String>> entry : nodeConnectorContainerMap.entrySet()) {
+ Set<String> ncContainers = entry.getValue();
+ if (ncContainers.contains(containerName)) {
+ NodeConnector nodeConnector = entry.getKey();
+ removeNodeConnectorSet.add(nodeConnector);
+ }
+ }
+ for (Map.Entry<Node, Set<String>> entry : nodeContainerMap.entrySet()) {
+ Set<String> nodeContainers = entry.getValue();
+ if (nodeContainers.contains(containerName)) {
+ Node node = entry.getKey();
+ removeNodeSet.add(node);
+ }
+ }
+ for (NodeConnector nodeConnector : removeNodeConnectorSet) {
+ Set<String> ncContainers = nodeConnectorContainerMap.get(nodeConnector);
+ ncContainers.remove(containerName);
+ if (ncContainers.isEmpty()) {
+ nodeConnectorContainerMap.remove(nodeConnector);
+ }
+ }
+ for (Node node : removeNodeSet) {
+ Set<String> nodeContainers = nodeContainerMap.get(node);
+ nodeContainers.remove(containerName);
+ if (nodeContainers.isEmpty()) {
+ nodeContainerMap.remove(node);
+ }
+ }
+ }
}
import org.opendaylight.controller.sal.action.ActionType;
import org.opendaylight.controller.sal.action.Output;
import org.opendaylight.controller.sal.core.ContainerFlow;
+import org.opendaylight.controller.sal.core.IContainerAware;
import org.opendaylight.controller.sal.core.IContainerListener;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.core.NodeConnector;
* Read Service shim layer which is in charge of filtering the flow statistics
* based on container. It is a Global instance.
*/
-public class ReadServiceFilter implements IReadServiceFilter, IContainerListener, IOFStatisticsListener {
+public class ReadServiceFilter implements IReadServiceFilter, IContainerListener, IOFStatisticsListener, IContainerAware {
private static final Logger logger = LoggerFactory
.getLogger(ReadServiceFilter.class);
private IController controller = null;
l.getValue().nodeTableStatisticsUpdated(node, tableStatsList);
}
}
+
+ @Override
+ public void containerCreate(String containerName) {
+ // do nothing
+ }
+
+ @Override
+ public void containerDestroy(String containerName) {
+ containerToNc.remove(containerName);
+ containerToNode.remove(containerName);
+ containerToNt.remove(containerName);
+ containerFlows.remove(containerName);
+ }
}
import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider;
import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.FrameworkUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.opendaylight.controller.sal.core.Bandwidth;
import org.opendaylight.controller.sal.core.Config;
import org.opendaylight.controller.sal.core.ContainerFlow;
import org.opendaylight.controller.sal.core.Edge;
+import org.opendaylight.controller.sal.core.IContainerAware;
import org.opendaylight.controller.sal.core.IContainerListener;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.core.NodeConnector;
import org.opendaylight.controller.sal.core.UpdateType;
import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
import org.opendaylight.controller.sal.utils.GlobalConstants;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The class describes a shim layer that relays the topology events from
*/
public class TopologyServiceShim implements IDiscoveryListener,
IContainerListener, CommandProvider, IRefreshInternalProvider,
- IInventoryShimExternalListener {
+ IInventoryShimExternalListener, IContainerAware {
protected static final Logger logger = LoggerFactory
.getLogger(TopologyServiceShim.class);
private ConcurrentMap<String, ITopologyServiceShimListener> topologyServiceShimListeners = new ConcurrentHashMap<String, ITopologyServiceShimListener>();
break;
}
}
+
+ @Override
+ public void containerCreate(String containerName) {
+ // do nothing
+ }
+
+ @Override
+ public void containerDestroy(String containerName) {
+ Set<NodeConnector> removeNodeConnectorSet = new HashSet<NodeConnector>();
+ for (Map.Entry<NodeConnector, List<String>> entry : containerMap.entrySet()) {
+ List<String> ncContainers = entry.getValue();
+ if (ncContainers.contains(containerName)) {
+ NodeConnector nodeConnector = entry.getKey();
+ removeNodeConnectorSet.add(nodeConnector);
+ }
+ }
+ for (NodeConnector nodeConnector : removeNodeConnectorSet) {
+ List<String> ncContainers = containerMap.get(nodeConnector);
+ ncContainers.remove(containerName);
+ if (ncContainers.isEmpty()) {
+ containerMap.remove(nodeConnector);
+ }
+ }
+ edgeMap.remove(containerName);
+ }
}
private static final String WEB_ID = "flows";
private static final short WEB_ORDER = 2;
+ private final Gson gson;
+
public Flows() {
ServiceHelper.registerGlobalService(IDaylightWeb.class, this, null);
+ gson = new Gson();
}
@Override
return null;
}
- Gson gson = new Gson();
FlowConfig flow = gson.fromJson(body, FlowConfig.class);
+
Node node = Node.fromString(nodeId);
flow.setNode(node);
Status result = new Status(StatusCode.BADREQUEST, "Invalid request");
return "Forwarding Rules Manager is not available";
}
- Gson gson = new Gson();
List<Map<String, String>> flowList = new ArrayList<Map<String, String>>();
flowList = gson.fromJson(body, flowList.getClass());
Status result = new Status(StatusCode.BADREQUEST, "Invalid request");
var flows = one.f.flows.registry.flows;
var flow;
$(flows).each(function(index, value) {
- if (value['name'] == id) {
- flow = value;
- }
+ if (value.name == id && value.nodeId == node) {
+ flow = value;
+ }
});
if (one.f.flows.registry.privilege === 'WRITE') {
// remove button
data: data.flows,
formatter: function(items) {
$.each(items, function(index, item) {
- var $checkbox = document.createElement("input");
+ var $checkbox = document.createElement("input");
$checkbox.setAttribute("type", "checkbox");
$checkbox.setAttribute("name", item.name);
- $checkbox.setAttribute("node", item.node);
+ $checkbox.setAttribute("node", item.nodeId);
$checkbox.setAttribute('class','flowEntry')
item.selector = $checkbox.outerHTML;
item["name"] = '<span data-installInHw=' + item["flow"]["installInHw"] +
// activate first tab on each dashlet
$('.dash .nav').each(function(index, value) {
$($(value).find('li')[0]).find('a').click();
-});
\ No newline at end of file
+});