X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fforwardingrulesmanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fforwardingrulesmanager%2Finternal%2FForwardingRulesManager.java;h=fc18079b0afb4537bb7a38a5da3d01fe3736b07d;hb=0bbb1613ee4c4c3f0a4fbbf8b9dc7ebcfbafde34;hp=d08b24d0783f796f2bda87da71fa34833d37411f;hpb=cb6fd7676c31ccae1f13077a48bc2e36e67089b4;p=controller.git diff --git a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java index d08b24d078..fc18079b0a 100644 --- a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java +++ b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManager.java @@ -39,6 +39,7 @@ import org.opendaylight.controller.clustering.services.ICacheUpdateAware; import org.opendaylight.controller.clustering.services.IClusterContainerServices; import org.opendaylight.controller.clustering.services.IClusterServices; import org.opendaylight.controller.configuration.IConfigurationContainerAware; +import org.opendaylight.controller.connectionmanager.ConnectionLocality; import org.opendaylight.controller.connectionmanager.IConnectionManager; import org.opendaylight.controller.forwardingrulesmanager.FlowConfig; import org.opendaylight.controller.forwardingrulesmanager.FlowEntry; @@ -201,59 +202,6 @@ public class ForwardingRulesManager implements private ConcurrentMap workMonitor = new ConcurrentHashMap(); - /* - * Create an executor pool to create the distributionOrder, this is a stop - * gap solution caused by an issue with non-transactional caches in the - * implementation we use, being currently worked on. It has been noticed in - * fact that when non-transactional caches are being used sometime the key - * are no distributed to all the nodes properly. To workaround the issue - * transactional caches are being used, but there was a reason for using - * non-transactional caches to start with, in fact we needed to be able in - * the context of a northbound transaction to program the FRM entries - * irrespective of the fact that transaction would commit or no else we - * would not be able to achieve the entry programming and implement the - * scheme for recovery from network element failures. Bottom line, now in - * order to make sure an update on a transactional cache goes out while in a - * transaction that need to be initiated by a different thread. - */ - private ExecutorService executor; - - class DistributeOrderCallable implements Callable> { - private FlowEntryInstall e; - private FlowEntryInstall u; - private UpdateType t; - DistributeOrderCallable(FlowEntryInstall e, FlowEntryInstall u, UpdateType t) { - this.e = e; - this.u = u; - this.t = t; - } - - @Override - public Future call() throws Exception { - if (e == null || t == null) { - logsync.error("Unexpected null Entry up update type"); - return null; - } - // Create the work order and distribute it - FlowEntryDistributionOrder fe = - new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress()); - // First create the monitor job - FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe); - logsync.trace("Node {} not local so sending fe {}", e.getNode(), fe); - workMonitor.put(fe, ret); - if (t.equals(UpdateType.CHANGED)) { - // Then distribute the work - workOrder.put(fe, u); - } else { - // Then distribute the work - workOrder.put(fe, e); - } - logsync.trace("WorkOrder requested"); - // Now create an Handle to monitor the execution of the operation - return ret; - } - } - /** * @param e * Entry being installed/updated/removed @@ -273,25 +221,27 @@ public class ForwardingRulesManager implements } Node n = e.getNode(); - if (!connectionManager.isLocal(n)) { - Callable> worker = new DistributeOrderCallable(e, u, t); - if (worker != null) { - Future> workerRes = this.executor.submit(worker); - try { - return workerRes.get(); - } catch (InterruptedException e1) { - // we where interrupted, not a big deal. - return null; - } catch (ExecutionException e1) { - logsync.error( - "We got an execution exception {} we cannot much, so returning we don't have nothing to wait for", - e); - return null; - } + if (connectionManager.getLocalityStatus(n) == ConnectionLocality.NOT_LOCAL) { + // Create the work order and distribute it + FlowEntryDistributionOrder fe = + new FlowEntryDistributionOrder(e, t, clusterContainerService.getMyAddress()); + // First create the monitor job + FlowEntryDistributionOrderFutureTask ret = new FlowEntryDistributionOrderFutureTask(fe); + logsync.trace("Node {} not local so sending fe {}", n, fe); + workMonitor.put(fe, ret); + if (t.equals(UpdateType.CHANGED)) { + // Then distribute the work + workOrder.put(fe, u); + } else { + // Then distribute the work + workOrder.put(fe, e); } + logsync.trace("WorkOrder requested"); + // Now create an Handle to monitor the execution of the operation + return ret; } - logsync.trace("LOCAL Node {} so processing Entry:{} UpdateType:{}", n, e, t); + logsync.trace("Node {} could be local. so processing Entry:{} UpdateType:{}", n, e, t); return null; } @@ -1406,10 +1356,10 @@ public class ForwardingRulesManager implements EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); clusterContainerService.createCache(WORKSTATUSCACHE, - EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); clusterContainerService.createCache(WORKORDERCACHE, - EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); + EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL)); } catch (CacheConfigException cce) { log.error("CacheConfigException"); @@ -1867,9 +1817,13 @@ public class ForwardingRulesManager implements } } if (target != null) { - // Program the network node - Status status = (target.installInHw()) ? this.uninstallFlowEntry(target.getFlowEntry()) : this - .installFlowEntry(target.getFlowEntry()); + Status status = target.validate(container); + if (!status.isSuccess()) { + log.warn(status.getDescription()); + return status; + } + status = (target.installInHw()) ? this.uninstallFlowEntry(target.getFlowEntry()) : this + .installFlowEntry(target.getFlowEntry()); if (status.isSuccess()) { // Update Configuration database target.setStatus(SUCCESS); @@ -2096,51 +2050,79 @@ public class ForwardingRulesManager implements addStaticFlowInternal(allowARP, true); // skip validation on internal static flow name } + /** + * (non-Javadoc) + * + * @see org.opendaylight.controller.switchmanager.ISwitchManagerAware#modeChangeNotify(org.opendaylight.controller.sal.core.Node, + * boolean) + * + * This method can be called from within the OSGi framework context, + * given the programming operation can take sometime, it not good + * pratice to have in it's context operations that can take time, + * hence moving off to a different thread for async processing. + */ + private ExecutorService executor; @Override - public void modeChangeNotify(Node node, boolean proactive) { - List defaultConfigs = new ArrayList(); - - List puntAction = new ArrayList(); - puntAction.add(ActionType.CONTROLLER.toString()); - - FlowConfig allowARP = new FlowConfig(); - allowARP.setInstallInHw(true); - allowARP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt ARP" + FlowConfig.INTERNALSTATICFLOWEND); - allowARP.setPriority("1"); - allowARP.setNode(node); - allowARP.setEtherType("0x" + Integer.toHexString(EtherTypes.ARP.intValue()).toUpperCase()); - allowARP.setActions(puntAction); - defaultConfigs.add(allowARP); - - FlowConfig allowLLDP = new FlowConfig(); - allowLLDP.setInstallInHw(true); - allowLLDP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt LLDP" + FlowConfig.INTERNALSTATICFLOWEND); - allowLLDP.setPriority("1"); - allowLLDP.setNode(node); - allowLLDP.setEtherType("0x" + Integer.toHexString(EtherTypes.LLDP.intValue()).toUpperCase()); - allowLLDP.setActions(puntAction); - defaultConfigs.add(allowLLDP); - - List dropAction = new ArrayList(); - dropAction.add(ActionType.DROP.toString()); - - FlowConfig dropAllConfig = new FlowConfig(); - dropAllConfig.setInstallInHw(true); - dropAllConfig.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Catch-All Drop" + FlowConfig.INTERNALSTATICFLOWEND); - dropAllConfig.setPriority("0"); - dropAllConfig.setNode(node); - dropAllConfig.setActions(dropAction); - defaultConfigs.add(dropAllConfig); - - log.info("Forwarding mode for node {} set to {}", node, (proactive ? "proactive" : "reactive")); - for (FlowConfig fc : defaultConfigs) { - Status status = (proactive) ? addStaticFlowInternal(fc, false) : removeStaticFlow(fc); - if (status.isSuccess()) { - log.info("{} Proactive Static flow: {}", (proactive ? "Installed" : "Removed"), fc.getName()); - } else { - log.warn("Failed to {} Proactive Static flow: {}", (proactive ? "install" : "remove"), fc.getName()); + public void modeChangeNotify(final Node node, final boolean proactive) { + Callable modeChangeCallable = new Callable() { + @Override + public Status call() throws Exception { + List defaultConfigs = new ArrayList(); + + List puntAction = new ArrayList(); + puntAction.add(ActionType.CONTROLLER.toString()); + + FlowConfig allowARP = new FlowConfig(); + allowARP.setInstallInHw(true); + allowARP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt ARP" + FlowConfig.INTERNALSTATICFLOWEND); + allowARP.setPriority("1"); + allowARP.setNode(node); + allowARP.setEtherType("0x" + Integer.toHexString(EtherTypes.ARP.intValue()) + .toUpperCase()); + allowARP.setActions(puntAction); + defaultConfigs.add(allowARP); + + FlowConfig allowLLDP = new FlowConfig(); + allowLLDP.setInstallInHw(true); + allowLLDP.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Punt LLDP" + FlowConfig.INTERNALSTATICFLOWEND); + allowLLDP.setPriority("1"); + allowLLDP.setNode(node); + allowLLDP.setEtherType("0x" + Integer.toHexString(EtherTypes.LLDP.intValue()) + .toUpperCase()); + allowLLDP.setActions(puntAction); + defaultConfigs.add(allowLLDP); + + List dropAction = new ArrayList(); + dropAction.add(ActionType.DROP.toString()); + + FlowConfig dropAllConfig = new FlowConfig(); + dropAllConfig.setInstallInHw(true); + dropAllConfig.setName(FlowConfig.INTERNALSTATICFLOWBEGIN + "Catch-All Drop" + + FlowConfig.INTERNALSTATICFLOWEND); + dropAllConfig.setPriority("0"); + dropAllConfig.setNode(node); + dropAllConfig.setActions(dropAction); + defaultConfigs.add(dropAllConfig); + + log.info("Forwarding mode for node {} set to {}", node, (proactive ? "proactive" : "reactive")); + for (FlowConfig fc : defaultConfigs) { + Status status = (proactive) ? addStaticFlowInternal(fc, false) : removeStaticFlow(fc); + if (status.isSuccess()) { + log.info("{} Proactive Static flow: {}", (proactive ? "Installed" : "Removed"), fc.getName()); + } else { + log.warn("Failed to {} Proactive Static flow: {}", (proactive ? "install" : "remove"), + fc.getName()); + } + } + return new Status(StatusCode.SUCCESS); } - } + }; + + /* + * Execute the work outside the caller context, this could be an + * expensive operation and we don't want to block the caller for it. + */ + this.executor.submit(modeChangeCallable); } /** @@ -2480,6 +2462,14 @@ public class ForwardingRulesManager implements } else { log.warn("Not expected null WorkStatus", work); } + } else if (event instanceof ContainerFlowChangeEvent) { + /* + * Whether it is an addition or removal, we have to + * recompute the merged flows entries taking into + * account all the current container flows because + * flow merging is not an injective function + */ + updateFlowsContainerFlow(); } else { log.warn("Dequeued unknown event {}", event.getClass() .getSimpleName()); @@ -2578,12 +2568,8 @@ public class ForwardingRulesManager implements } log.trace("Container {}: Updating installed flows because of container flow change: {} {}", container.getName(), t, current); - /* - * Whether it is an addition or removal, we have to recompute the merged - * flows entries taking into account all the current container flows - * because flow merging is not an injective function - */ - updateFlowsContainerFlow(); + ContainerFlowChangeEvent ev = new ContainerFlowChangeEvent(previous, current, t); + pendingEvents.offer(ev); } @Override @@ -2751,6 +2737,30 @@ public class ForwardingRulesManager implements return newEntry; } } + private class ContainerFlowChangeEvent extends FRMEvent { + private final ContainerFlow previous; + private final ContainerFlow current; + private final UpdateType type; + + public ContainerFlowChangeEvent(ContainerFlow previous, ContainerFlow current, UpdateType type) { + this.previous = previous; + this.current = current; + this.type = type; + } + + public ContainerFlow getPrevious() { + return this.previous; + } + + public ContainerFlow getCurrent() { + return this.current; + } + + public UpdateType getType() { + return this.type; + } + } + private class WorkStatusCleanup extends FRMEvent { private FlowEntryDistributionOrder fe; @@ -3078,7 +3088,7 @@ public class ForwardingRulesManager implements return; } Node n = fei.getNode(); - if (connectionManager.isLocal(n)) { + if (connectionManager.getLocalityStatus(n) == ConnectionLocality.LOCAL) { logsync.trace("workOrder for fe {} processed locally", fe); // I'm the controller in charge for the request, queue it for // processing