X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fforwardingrulesmanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fforwardingrulesmanager%2Finternal%2FForwardingRulesManager.java;h=a77224af8a172ed396addb3dbeab3c2dd8939361;hp=c839d15f10c81308909a03f804e5587ec6ef612c;hb=9aa1a4e8c24292301eac8a9fa71f39744c37eda4;hpb=19a34ba9057d07bfbcfb491786ffeb91f5b2c318 diff --git a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java index c839d15f10..a77224af8a 100644 --- a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java +++ b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java @@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import org.eclipse.osgi.framework.console.CommandInterpreter; @@ -39,7 +38,7 @@ import org.opendaylight.controller.clustering.services.ICacheUpdateAware; import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.clustering.services.IClusterServices; import org.opendaylight.controller.configuration.IConfigurationContainerAware; -import org.opendaylight.controller.connectionmanager.ConnectionLocality; +import org.opendaylight.controller.sal.connection.ConnectionLocality; import org.opendaylight.controller.connectionmanager.IConnectionManager; import org.opendaylight.controller.forwardingrulesmanager.FlowConfig; import org.opendaylight.controller.forwardingrulesmanager.FlowEntry; @@ -57,9 +56,10 @@ import org.opendaylight.controller.sal.action.Controller; import org.opendaylight.controller.sal.action.Flood; import org.opendaylight.controller.sal.action.Output; import org.opendaylight.controller.sal.action.PopVlan; +import org.opendaylight.controller.sal.core.Config; import org.opendaylight.controller.sal.core.ContainerFlow; import org.opendaylight.controller.sal.core.IContainer; -import org.opendaylight.controller.sal.core.IContainerListener; +import org.opendaylight.controller.sal.core.IContainerLocalListener; import org.opendaylight.controller.sal.core.Node; import org.opendaylight.controller.sal.core.NodeConnector; import org.opendaylight.controller.sal.core.Property; @@ -97,12 +97,12 @@ import org.slf4j.LoggerFactory; public class ForwardingRulesManager implements IForwardingRulesManager, PortGroupChangeListener, - IContainerListener, + IContainerLocalListener, ISwitchManagerAware, IConfigurationContainerAware, IInventoryListener, IObjectReader, - ICacheUpdateAware, + ICacheUpdateAware, CommandProvider, IFlowProgrammerListener { private static final String NODEDOWN = "Node is Down"; @@ -213,7 +213,8 @@ public class ForwardingRulesManager implements * @return a Future object for monitoring the progress of the result, or * null in case the processing should take place locally */ - private Future distributeWorkOrder(FlowEntryInstall e, FlowEntryInstall u, UpdateType t) { + private FlowEntryDistributionOrderFutureTask distributeWorkOrder(FlowEntryInstall e, FlowEntryInstall u, + UpdateType t) { // A null entry it's an unexpected condition, anyway it's safe to keep // the handling local if (e == null) { @@ -543,11 +544,17 @@ public class ForwardingRulesManager implements * contain the unique id assigned to this request */ private Status modifyEntryInternal(FlowEntryInstall currentEntries, FlowEntryInstall newEntries, boolean async) { - Future futureStatus = distributeWorkOrder(currentEntries, newEntries, UpdateType.CHANGED); + FlowEntryDistributionOrderFutureTask futureStatus = + distributeWorkOrder(currentEntries, newEntries, UpdateType.CHANGED); if (futureStatus != null) { Status retStatus = new Status(StatusCode.UNDEFINED); try { retStatus = futureStatus.get(); + if (retStatus.getCode() + .equals(StatusCode.TIMEOUT)) { + // A timeout happened, lets cleanup the workMonitor + workMonitor.remove(futureStatus.getOrder()); + } } catch (InterruptedException e) { log.error("", e); } catch (ExecutionException e) { @@ -655,11 +662,16 @@ public class ForwardingRulesManager implements * contain the unique id assigned to this request */ private Status removeEntryInternal(FlowEntryInstall entry, boolean async) { - Future futureStatus = distributeWorkOrder(entry, null, UpdateType.REMOVED); + FlowEntryDistributionOrderFutureTask futureStatus = distributeWorkOrder(entry, null, UpdateType.REMOVED); if (futureStatus != null) { Status retStatus = new Status(StatusCode.UNDEFINED); try { retStatus = futureStatus.get(); + if (retStatus.getCode() + .equals(StatusCode.TIMEOUT)) { + // A timeout happened, lets cleanup the workMonitor + workMonitor.remove(futureStatus.getOrder()); + } } catch (InterruptedException e) { log.error("", e); } catch (ExecutionException e) { @@ -703,11 +715,16 @@ public class ForwardingRulesManager implements * contain the unique id assigned to this request */ private Status addEntriesInternal(FlowEntryInstall entry, boolean async) { - Future futureStatus = distributeWorkOrder(entry, null, UpdateType.ADDED); + FlowEntryDistributionOrderFutureTask futureStatus = distributeWorkOrder(entry, null, UpdateType.ADDED); if (futureStatus != null) { Status retStatus = new Status(StatusCode.UNDEFINED); try { retStatus = futureStatus.get(); + if (retStatus.getCode() + .equals(StatusCode.TIMEOUT)) { + // A timeout happened, lets cleanup the workMonitor + workMonitor.remove(futureStatus.getOrder()); + } } catch (InterruptedException e) { log.error("", e); } catch (ExecutionException e) { @@ -765,6 +782,16 @@ public class ForwardingRulesManager implements return true; } + private ConcurrentMap.Entry getStaticFlowEntry(String name, Node node) { + for (ConcurrentMap.Entry flowEntry : staticFlows.entrySet()) { + FlowConfig flowConfig = flowEntry.getValue(); + if (flowConfig.isByNameAndNodeIdEqual(name, node)) { + return flowEntry; + } + } + return null; + } + private void updateLocalDatabase(FlowEntryInstall entry, boolean add) { // Update the software view updateSwViewes(entry, add); @@ -1312,7 +1339,6 @@ public class ForwardingRulesManager implements retrieveCaches(); } - @SuppressWarnings("deprecation") private void allocateCaches() { if (this.clusterContainerService == null) { log.warn("Un-initialized clusterContainerService, can't create cache"); @@ -1340,9 +1366,6 @@ public class ForwardingRulesManager implements clusterContainerService.createCache("frm.staticFlows", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - clusterContainerService.createCache("frm.flowsSaveEvent", - EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - clusterContainerService.createCache("frm.staticFlowsOrdinal", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); @@ -1368,7 +1391,7 @@ public class ForwardingRulesManager implements } } - @SuppressWarnings({ "unchecked", "deprecation" }) + @SuppressWarnings({ "unchecked" }) private void retrieveCaches() { ConcurrentMap map; @@ -1930,10 +1953,9 @@ public class ForwardingRulesManager implements @Override public FlowConfig getStaticFlow(String name, Node node) { - for (ConcurrentMap.Entry entry : staticFlows.entrySet()) { - if (entry.getValue().isByNameAndNodeIdEqual(name, node)) { - return entry.getValue(); - } + ConcurrentMap.Entry entry = getStaticFlowEntry(name, node); + if(entry != null) { + return entry.getValue(); } return null; } @@ -2174,7 +2196,99 @@ public class ForwardingRulesManager implements @Override public void notifyNodeConnector(NodeConnector nodeConnector, UpdateType type, Map propMap) { + boolean updateStaticFlowCluster = false; + + switch (type) { + case ADDED: + break; + case CHANGED: + Config config = (propMap == null) ? null : (Config) propMap.get(Config.ConfigPropName); + if (config != null) { + switch (config.getValue()) { + case Config.ADMIN_DOWN: + log.trace("Port {} is administratively down: uninstalling interested flows", nodeConnector); + updateStaticFlowCluster = removeFlowsOnNodeConnectorDown(nodeConnector); + break; + case Config.ADMIN_UP: + log.trace("Port {} is administratively up: installing interested flows", nodeConnector); + updateStaticFlowCluster = installFlowsOnNodeConnectorUp(nodeConnector); + break; + case Config.ADMIN_UNDEF: + break; + default: + } + } + break; + case REMOVED: + // This is the case where a switch port is removed from the SDN agent space + log.trace("Port {} was removed from our control: uninstalling interested flows", nodeConnector); + updateStaticFlowCluster = removeFlowsOnNodeConnectorDown(nodeConnector); + break; + default: + + } + if (updateStaticFlowCluster) { + refreshClusterStaticFlowsStatus(nodeConnector.getNode()); + } + } + + /* + * It goes through the static flows configuration, it identifies the ones + * which have the specified node connector as input or output port and + * install them on the network node if they are marked to be installed in + * hardware and their status shows they were not installed yet + */ + private boolean installFlowsOnNodeConnectorUp(NodeConnector nodeConnector) { + boolean updated = false; + List flowConfigForNode = getStaticFlows(nodeConnector.getNode()); + for (FlowConfig flowConfig : flowConfigForNode) { + if (doesFlowContainNodeConnector(flowConfig.getFlow(), nodeConnector)) { + if (flowConfig.installInHw() && !flowConfig.getStatus().equals(SUCCESS)) { + Status status = this.installFlowEntry(flowConfig.getFlowEntry()); + if (!status.isSuccess()) { + flowConfig.setStatus(status.getDescription()); + } else { + flowConfig.setStatus(SUCCESS); + } + updated = true; + } + } + } + return updated; + } + + /* + * Remove from the network node all the flows which have the specified node + * connector as input or output port. If any of the flow entry is a static + * flow, it updates the correspondent configuration. + */ + private boolean removeFlowsOnNodeConnectorDown(NodeConnector nodeConnector) { + boolean updated = false; + List nodeFlowEntries = nodeFlows.get(nodeConnector.getNode()); + if (nodeFlowEntries == null) { + return updated; + } + for (FlowEntryInstall fei : new ArrayList(nodeFlowEntries)) { + if (doesFlowContainNodeConnector(fei.getInstall().getFlow(), nodeConnector)) { + Status status = this.removeEntryInternal(fei, true); + if (!status.isSuccess()) { + continue; + } + /* + * If the flow entry is a static flow, then update its + * configuration + */ + if (fei.getGroupName().equals(FlowConfig.STATICFLOWGROUP)) { + FlowConfig flowConfig = getStaticFlow(fei.getFlowName(), fei.getNode()); + if (flowConfig != null) { + flowConfig.setStatus(PORTREMOVED); + updated = true; + } + } + } + } + return updated; } private FlowConfig getDerivedFlowConfig(FlowConfig original, String configName, Short port) { @@ -2399,6 +2513,7 @@ public class ForwardingRulesManager implements log.warn("Dequeued null event"); continue; } + log.trace("Dequeued {} event", event.getClass().getSimpleName()); if (event instanceof NodeUpdateEvent) { NodeUpdateEvent update = (NodeUpdateEvent) event; Node node = update.getNode(); @@ -2462,6 +2577,14 @@ public class ForwardingRulesManager implements } else { log.warn("Not expected null WorkStatus", work); } + } else if (event instanceof ContainerFlowChangeEvent) { + /* + * Whether it is an addition or removal, we have to + * recompute the merged flows entries taking into + * account all the current container flows because + * flow merging is not an injective function + */ + updateFlowsContainerFlow(); } else { log.warn("Dequeued unknown event {}", event.getClass() .getSimpleName()); @@ -2560,12 +2683,8 @@ public class ForwardingRulesManager implements } log.trace("Container {}: Updating installed flows because of container flow change: {} {}", container.getName(), t, current); - /* - * Whether it is an addition or removal, we have to recompute the merged - * flows entries taking into account all the current container flows - * because flow merging is not an injective function - */ - updateFlowsContainerFlow(); + ContainerFlowChangeEvent ev = new ContainerFlowChangeEvent(previous, current, t); + pendingEvents.offer(ev); } @Override @@ -2578,57 +2697,22 @@ public class ForwardingRulesManager implements switch (t) { case REMOVED: - - List nodeFlowEntries = nodeFlows.get(nc.getNode()); - if (nodeFlowEntries == null) { - return; - } - for (FlowEntryInstall fei : new ArrayList(nodeFlowEntries)) { - if (doesFlowContainNodeConnector(fei.getInstall().getFlow(), nc)) { - Status status = this.removeEntryInternal(fei, true); - if (!status.isSuccess()) { - continue; - } - /* - * If the flow entry is a static flow, then update its - * configuration - */ - if (fei.getGroupName().equals(FlowConfig.STATICFLOWGROUP)) { - FlowConfig flowConfig = getStaticFlow(fei.getFlowName(), fei.getNode()); - if (flowConfig != null) { - flowConfig.setStatus(PORTREMOVED); - updateStaticFlowCluster = true; - } - } - } - } - if (updateStaticFlowCluster) { - refreshClusterStaticFlowsStatus(nc.getNode()); - } + log.trace("Port {} was removed from container: uninstalling interested flows", nc); + updateStaticFlowCluster = removeFlowsOnNodeConnectorDown(nc); break; case ADDED: - List flowConfigForNode = getStaticFlows(nc.getNode()); - for (FlowConfig flowConfig : flowConfigForNode) { - if (doesFlowContainNodeConnector(flowConfig.getFlow(), nc)) { - if (flowConfig.installInHw()) { - Status status = this.installFlowEntry(flowConfig.getFlowEntry()); - if (!status.isSuccess()) { - flowConfig.setStatus(status.getDescription()); - } else { - flowConfig.setStatus(SUCCESS); - } - updateStaticFlowCluster = true; - } - } - } - if (updateStaticFlowCluster) { - refreshClusterStaticFlowsStatus(nc.getNode()); - } + log.trace("Port {} was added to container: reinstall interested flows", nc); + updateStaticFlowCluster = installFlowsOnNodeConnectorUp(nc); + break; case CHANGED: break; default: } + + if (updateStaticFlowCluster) { + refreshClusterStaticFlowsStatus(nc.getNode()); + } } @Override @@ -2733,6 +2817,30 @@ public class ForwardingRulesManager implements return newEntry; } } + private class ContainerFlowChangeEvent extends FRMEvent { + private final ContainerFlow previous; + private final ContainerFlow current; + private final UpdateType type; + + public ContainerFlowChangeEvent(ContainerFlow previous, ContainerFlow current, UpdateType type) { + this.previous = previous; + this.current = current; + this.type = type; + } + + public ContainerFlow getPrevious() { + return this.previous; + } + + public ContainerFlow getCurrent() { + return this.current; + } + + public UpdateType getType() { + return this.type; + } + } + private class WorkStatusCleanup extends FRMEvent { private FlowEntryDistributionOrder fe; @@ -2930,6 +3038,36 @@ public class ForwardingRulesManager implements } } + public void _frmProcessErrorEvent(CommandInterpreter ci) throws UnknownHostException { + Node node = null; + long reqId = 0L; + String nodeId = ci.nextArgument(); + if (nodeId == null) { + ci.print("Node id not specified"); + return; + } + String requestId = ci.nextArgument(); + if (requestId == null) { + ci.print("Request id not specified"); + return; + } + try { + node = NodeCreator.createOFNode(Long.valueOf(nodeId)); + } catch (NumberFormatException e) { + ci.print("Node id not a number"); + return; + } + try { + reqId = Long.parseLong(requestId); + } catch (NumberFormatException e) { + ci.print("Request id not a number"); + return; + } + // null for error object is good enough for now + ErrorReportedEvent event = new ErrorReportedEvent(reqId, node, null); + this.processErrorEvent(event); + } + @Override public void flowRemoved(Node node, Flow flow) { log.trace("Received flow removed notification on {} for {}", node, flow); @@ -2982,16 +3120,32 @@ public class ForwardingRulesManager implements * mapping will have to be added in future */ FlowEntryInstall target = null; - for (FlowEntryInstall index : nodeFlows.get(node)) { - FlowEntryInstall entry = installedSwView.get(index); - if (entry.getRequestId() == rid) { - target = entry; - break; + List flowEntryInstallList = nodeFlows.get(node); + // flowEntryInstallList could be null. + // so check for it. + if(flowEntryInstallList != null) { + for (FlowEntryInstall index : flowEntryInstallList) { + FlowEntryInstall entry = installedSwView.get(index); + if(entry != null) { + if (entry.getRequestId() == rid) { + target = entry; + break; + } + } } } if (target != null) { // This was a flow install, update database this.updateLocalDatabase(target, false); + // also update the config + if(FlowConfig.STATICFLOWGROUP.equals(target.getGroupName())) { + ConcurrentMap.Entry staticFlowEntry = getStaticFlowEntry(target.getFlowName(),target.getNode()); + // staticFlowEntry should never be null. + // the null check is just an extra defensive check. + if(staticFlowEntry != null) { + staticFlows.remove(staticFlowEntry.getKey()); + } + } } // Notify listeners @@ -3079,7 +3233,7 @@ public class ForwardingRulesManager implements */ if (fe.getRequestorController() .equals(clusterContainerService.getMyAddress())) { - FlowEntryDistributionOrderFutureTask fet = workMonitor.get(fe); + FlowEntryDistributionOrderFutureTask fet = workMonitor.remove(fe); if (fet != null) { logsync.trace("workStatus response is for us {}", fe); // Signal we got the status