X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcompatibility%2Fsal-compatibility%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fcompatibility%2FFlowProgrammerAdapter.xtend;h=35c641c45aae4e3d0b9f18233abd96228e8ca8fd;hp=450c7f1f239c4634b994fd6b71b941492321d8b4;hb=5913578c89e52af7a7b755d69b2bb5db0939cd27;hpb=eb538dbc16c0bb487f8ed70547b0df2c8c96c432 diff --git a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.xtend b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.xtend index 450c7f1f23..35c641c45a 100644 --- a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.xtend +++ b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.xtend @@ -1,12 +1,21 @@ package org.opendaylight.controller.sal.compatibility +import java.util.Map +import java.util.UUID import java.util.concurrent.ExecutionException +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Future +import java.util.EnumSet import org.opendaylight.controller.sal.core.Node import org.opendaylight.controller.sal.flowprogrammer.Flow import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService import org.opendaylight.controller.sal.flowprogrammer.IPluginOutFlowProgrammerService import org.opendaylight.controller.sal.utils.Status import org.opendaylight.controller.sal.utils.StatusCode +import org.opendaylight.controller.clustering.services.CacheExistException +import org.opendaylight.controller.clustering.services.IClusterGlobalServices +import org.opendaylight.controller.clustering.services.IClusterServices + import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved @@ -20,19 +29,14 @@ import org.slf4j.LoggerFactory import org.opendaylight.controller.sal.binding.api.data.DataBrokerService import org.opendaylight.controller.md.sal.common.api.TransactionStatus -import org.opendaylight.controller.md.sal.common.api.data.DataModification import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey -import org.opendaylight.yangtools.yang.binding.DataObject import org.opendaylight.yangtools.yang.binding.InstanceIdentifier -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId import static extension org.opendaylight.controller.sal.compatibility.MDFlowMapping.* @@ -43,6 +47,7 @@ import static extension org.opendaylight.controller.sal.compatibility.ToSalConve class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowListener { private static val LOG = LoggerFactory.getLogger(FlowProgrammerAdapter); + private static val CACHE_NAME = "flowprogrammeradapter.flowtoid"; @Property private SalFlowService delegate; @@ -53,39 +58,38 @@ class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowLi @Property private IPluginOutFlowProgrammerService flowProgrammerPublisher; + @Property + private IClusterGlobalServices clusterGlobalServices; + + + @Property + private Map flowToFlowId = new ConcurrentHashMap(); + + override addFlow(Node node, Flow flow) { - return addFlowAsync(node,flow,0) + return toFutureStatus(internalAddFlowAsync(node,flow,0)); } override modifyFlow(Node node, Flow oldFlow, Flow newFlow) { - return modifyFlowAsync(node, oldFlow,newFlow,0) + return toFutureStatus(internalModifyFlowAsync(node, oldFlow,newFlow,0)); } override removeFlow(Node node, Flow flow) { - return removeFlowAsync(node, flow,0); + return toFutureStatus(internalRemoveFlowAsync(node, flow,0)); } override addFlowAsync(Node node, Flow flow, long rid) { - writeFlow(flow.toMDFlow, new NodeKey(new NodeId(node.getNodeIDString()))); + internalAddFlowAsync(node, flow, rid); return toStatus(true); } override modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) { - writeFlow(newFlow.toMDFlow, new NodeKey(new NodeId(node.getNodeIDString()))); + internalModifyFlowAsync(node, oldFlow, newFlow, rid); return toStatus(true); } - override removeFlowAsync(Node node, Flow adflow, long rid) { - val flow = adflow.toMDFlow; - val modification = this._dataBrokerService.beginTransaction(); - val flowPath = InstanceIdentifier.builder(Nodes) - .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, new NodeKey(new NodeId(node.getNodeIDString()))) - .augmentation(FlowCapableNode) - .child(Table, new TableKey(flow.getTableId())) - .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id)) - .build; - modification.removeConfigurationData(flowPath); - val commitFuture = modification.commit(); + override removeFlowAsync(Node node, Flow flow, long rid) { + internalRemoveFlowAsync(node, flow, rid); return toStatus(true); } @@ -114,27 +118,6 @@ class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowLi } } - - private def writeFlow(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, NodeKey nodeKey) { - val modification = this._dataBrokerService.beginTransaction(); - val flowPath = InstanceIdentifier.builder(Nodes) - .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, nodeKey) - .augmentation(FlowCapableNode) - .child(Table, new TableKey(flow.getTableId())) - .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id)) - .build; - modification.putConfigurationData(flowPath, flow); - val commitFuture = modification.commit(); - try { - val result = commitFuture.get(); - val status = result.getResult(); - } catch (InterruptedException e) { - LOG.error(e.getMessage(), e); - } catch (ExecutionException e) { - LOG.error(e.getMessage(), e); - } - } - public static def toStatus(RpcResult result) { return toStatus(result.isSuccessful()); } @@ -177,4 +160,90 @@ class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowLi NodeExperimenterErrorNotification notification) { // NOOP : Not supported by AD SAL } + + private def Future> writeFlowAsync(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, NodeKey nodeKey){ + val modification = this._dataBrokerService.beginTransaction(); + val flowPath = InstanceIdentifier.builder(Nodes) + .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, nodeKey) + .augmentation(FlowCapableNode) + .child(Table, new TableKey(flow.getTableId())) + .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id)) + .build; + modification.putConfigurationData(flowPath, flow); + return modification.commit(); + } + + private def Future> internalAddFlowAsync(Node node, Flow flow, long rid){ + var flowId = getCache().get(flow); + if(flowId != null) { + removeFlow(node, flow); + return internalAddFlowAsync(node, flow, rid); + } + + flowId = UUID.randomUUID(); + getCache().put(flow, flowId); + + return writeFlowAsync(flow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString()))); + } + + private def Future> internalModifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) { + val flowId = getCache().remove(oldFlow); + if(flowId == null){ + throw new IllegalArgumentException("oldFlow is unknown"); + } + + getCache().put(newFlow, flowId); + return writeFlowAsync(newFlow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString()))); + } + + + private def Future> internalRemoveFlowAsync(Node node, Flow adflow, long rid){ + val flowId = getCache().remove(adflow); + if(flowId == null){ + throw new IllegalArgumentException("adflow is unknown"); + } + val flow = adflow.toMDFlow(flowId.toString()); + val modification = this._dataBrokerService.beginTransaction(); + val flowPath = InstanceIdentifier.builder(Nodes) + .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, new NodeKey(new NodeId(node.getNodeIDString()))) + .augmentation(FlowCapableNode) + .child(Table, new TableKey(flow.getTableId())) + .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id)) + .build; + modification.removeConfigurationData(flowPath); + return modification.commit(); + } + + private def toFutureStatus(Future> future){ + try { + val result = future.get(); + return toStatus(result); + } catch (InterruptedException e) { + return processException(e); + } catch (ExecutionException e) { + return processException(e); + } catch (Exception e){ + processException(e); + } + return toStatus(false); + } + + private def Map getCache(){ + if(clusterGlobalServices == null){ + return new ConcurrentHashMap(); + } + + var cache = clusterGlobalServices.getCache(CACHE_NAME); + + if(cache == null) { + try { + cache = clusterGlobalServices.createCache(CACHE_NAME, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); + } catch (CacheExistException e) { + cache = clusterGlobalServices.getCache(CACHE_NAME); + } + } + return cache as Map; + + } + }