X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcompatibility%2Fsal-compatibility%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fcompatibility%2FFlowProgrammerAdapter.xtend;h=8a0874ee3123da1d919db48a5d32e476afb839fc;hp=7d9c93908ec8484d502d6ab91d1bd26080c631cb;hb=8783c248f832b94b36b61d12230a87d973461f5c;hpb=83291dd59ee117454596081f09e1248e89551020 diff --git a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.xtend b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.xtend index 7d9c93908e..8a0874ee31 100644 --- a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.xtend +++ b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.xtend @@ -1,21 +1,52 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.controller.sal.compatibility +import java.util.Map +import java.util.UUID import java.util.concurrent.ExecutionException +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Future +import java.util.EnumSet import org.opendaylight.controller.sal.core.Node import org.opendaylight.controller.sal.flowprogrammer.Flow import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService import org.opendaylight.controller.sal.flowprogrammer.IPluginOutFlowProgrammerService import org.opendaylight.controller.sal.utils.Status import org.opendaylight.controller.sal.utils.StatusCode +import org.opendaylight.controller.clustering.services.CacheExistException +import org.opendaylight.controller.clustering.services.IClusterGlobalServices +import org.opendaylight.controller.clustering.services.IClusterServices + import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification import org.opendaylight.yangtools.yang.common.RpcResult import org.slf4j.LoggerFactory -import static org.opendaylight.controller.sal.compatibility.MDFlowMapping.* +import org.opendaylight.controller.sal.binding.api.data.DataBrokerService +import org.opendaylight.controller.md.sal.common.api.TransactionStatus +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId + + +import static extension org.opendaylight.controller.sal.compatibility.MDFlowMapping.* import static extension org.opendaylight.controller.sal.compatibility.NodeMapping.* import static extension org.opendaylight.controller.sal.compatibility.ToSalConversionsUtils.* @@ -23,67 +54,55 @@ import static extension org.opendaylight.controller.sal.compatibility.ToSalConve class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowListener { private static val LOG = LoggerFactory.getLogger(FlowProgrammerAdapter); + private static val CACHE_NAME = "flowprogrammeradapter.flowtoid"; @Property private SalFlowService delegate; + + @Property + private DataBrokerService dataBrokerService; @Property private IPluginOutFlowProgrammerService flowProgrammerPublisher; + @Property + private IClusterGlobalServices clusterGlobalServices; + + + @Property + private Map flowToFlowId = new ConcurrentHashMap(); + + override addFlow(Node node, Flow flow) { - val input = addFlowInput(node, flow); - val future = delegate.addFlow(input); - try { - val result = future.get(); - return toStatus(result); // how get status from result? conversion? - } catch (Exception e) { - return processException(e); - } + return toFutureStatus(internalAddFlowAsync(node,flow,0)); } override modifyFlow(Node node, Flow oldFlow, Flow newFlow) { - val input = updateFlowInput(node, oldFlow, newFlow); - val future = delegate.updateFlow(input); - try { - val result = future.get(); - return toStatus(result); - } catch (Exception e) { - return processException(e); - } + return toFutureStatus(internalModifyFlowAsync(node, oldFlow,newFlow,0)); } override removeFlow(Node node, Flow flow) { - val input = removeFlowInput(node, flow); - val future = delegate.removeFlow(input); - - try { - val result = future.get(); - return toStatus(result); - } catch (Exception e) { - return processException(e); - } + return toFutureStatus(internalRemoveFlowAsync(node, flow,0)); } override addFlowAsync(Node node, Flow flow, long rid) { - val input = addFlowInput(node, flow); - delegate.addFlow(input); - return new Status(StatusCode.SUCCESS); + internalAddFlowAsync(node, flow, rid); + return toStatus(true); } override modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) { - val input = updateFlowInput(node, oldFlow, newFlow); - delegate.updateFlow(input); - return new Status(StatusCode.SUCCESS); + internalModifyFlowAsync(node, oldFlow, newFlow, rid); + return toStatus(true); } override removeFlowAsync(Node node, Flow flow, long rid) { - val input = removeFlowInput(node, flow); - delegate.removeFlow(input); - return new Status(StatusCode.SUCCESS); + internalRemoveFlowAsync(node, flow, rid); + return toStatus(true); } override removeAllFlows(Node node) { - throw new UnsupportedOperationException("Not present in MD-SAL"); + // I know this looks like a copout... but its exactly what the legacy OFplugin did + return new Status(StatusCode.SUCCESS); } override syncSendBarrierMessage(Node node) { @@ -98,13 +117,17 @@ class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowLi return null; } - public static def toStatus(RpcResult result) { - if (result.isSuccessful()) { + private static def toStatus(boolean successful) { + if (successful) { return new Status(StatusCode.SUCCESS); } else { return new Status(StatusCode.INTERNALERROR); } } + + public static def toStatus(RpcResult result) { + return toStatus(result.isSuccessful()); + } private static dispatch def Status processException(InterruptedException e) { LOG.error("Interruption occured during processing flow",e); @@ -125,11 +148,122 @@ class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowLi } override onFlowRemoved(FlowRemoved notification) { - flowProgrammerPublisher.flowRemoved(notification.node.toADNode,notification.toFlow()); + if(notification != null && notification.node != null) { + val adNode = notification.node.toADNode + if(adNode != null) { + flowProgrammerPublisher.flowRemoved(adNode,notification.toFlow(adNode)); + } + } } override onFlowUpdated(FlowUpdated notification) { // NOOP : Not supported by AD SAL } + override onSwitchFlowRemoved(SwitchFlowRemoved notification) { + // NOOP : Not supported by AD SAL + } + + override onNodeErrorNotification(NodeErrorNotification notification) { + // NOOP : Not supported by AD SAL + } + + override onNodeExperimenterErrorNotification( + NodeExperimenterErrorNotification notification) { + // NOOP : Not supported by AD SAL + } + + private def Future> writeFlowAsync(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, NodeKey nodeKey){ + val modification = this._dataBrokerService.beginTransaction(); + val flowPath = InstanceIdentifier.builder(Nodes) + .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, nodeKey) + .augmentation(FlowCapableNode) + .child(Table, new TableKey(flow.getTableId())) + .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id)) + .build; + modification.putConfigurationData(flowPath, flow); + return modification.commit(); + } + + private def Future> internalAddFlowAsync(Node node, Flow flow, long rid){ + var flowId = getCache().get(flow); + if(flowId != null) { + removeFlow(node, flow); + return internalAddFlowAsync(node, flow, rid); + } + + flowId = UUID.randomUUID(); + getCache().put(flow, flowId); + + return writeFlowAsync(flow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString()))); + } + + private def Future> internalModifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) { + var flowId = getCache().remove(oldFlow); + if(flowId == null){ + LOG.error("oldFlow not found in cache : " + oldFlow.hashCode); + flowId = UUID.randomUUID(); + getCache().put(oldFlow, flowId); + } + + getCache().put(newFlow, flowId); + return writeFlowAsync(newFlow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString()))); + } + + + private def Future> internalRemoveFlowAsync(Node node, Flow adflow, long rid){ + val flowId = getCache().remove(adflow); + if(flowId == null){ + //throw new IllegalArgumentException("adflow not found in cache : " + adflow.hashCode); + LOG.error("adflow not found in cache : " + adflow.hashCode); + return null; + } + val flow = adflow.toMDFlow(flowId.toString()); + val modification = this._dataBrokerService.beginTransaction(); + val flowPath = InstanceIdentifier.builder(Nodes) + .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, new NodeKey(new NodeId(node.getNodeIDString()))) + .augmentation(FlowCapableNode) + .child(Table, new TableKey(flow.getTableId())) + .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id)) + .build; + modification.removeConfigurationData(flowPath); + return modification.commit(); + } + + private def toFutureStatus(Future> future){ + if(future == null){ + return toStatus(true); + } + + try { + val result = future.get(); + return toStatus(result); + } catch (InterruptedException e) { + return processException(e); + } catch (ExecutionException e) { + return processException(e); + } catch (Exception e){ + processException(e); + } + return toStatus(false); + } + + private def Map getCache(){ + if(clusterGlobalServices == null){ + return new ConcurrentHashMap(); + } + + var cache = clusterGlobalServices.getCache(CACHE_NAME); + + if(cache == null) { + try { + cache = clusterGlobalServices.createCache(CACHE_NAME, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); + } catch (CacheExistException e) { + cache = clusterGlobalServices.getCache(CACHE_NAME); + } + } + return cache as Map; + + } + }