X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcompatibility%2Fsal-compatibility%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fcompatibility%2FFlowProgrammerAdapter.xtend;h=8a0874ee3123da1d919db48a5d32e476afb839fc;hp=450c7f1f239c4634b994fd6b71b941492321d8b4;hb=a462a9efaffbfe98040ff46740ed071aeb3e511d;hpb=f46e0c0ac627304099c17783696bc0b5c9f622e1 diff --git a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.xtend b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.xtend index 450c7f1f23..8a0874ee31 100644 --- a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.xtend +++ b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.xtend @@ -1,12 +1,28 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ package org.opendaylight.controller.sal.compatibility +import java.util.Map +import java.util.UUID import java.util.concurrent.ExecutionException +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Future +import java.util.EnumSet import org.opendaylight.controller.sal.core.Node import org.opendaylight.controller.sal.flowprogrammer.Flow import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService import org.opendaylight.controller.sal.flowprogrammer.IPluginOutFlowProgrammerService import org.opendaylight.controller.sal.utils.Status import org.opendaylight.controller.sal.utils.StatusCode +import org.opendaylight.controller.clustering.services.CacheExistException +import org.opendaylight.controller.clustering.services.IClusterGlobalServices +import org.opendaylight.controller.clustering.services.IClusterServices + import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved @@ -20,19 +36,14 @@ import org.slf4j.LoggerFactory import org.opendaylight.controller.sal.binding.api.data.DataBrokerService import org.opendaylight.controller.md.sal.common.api.TransactionStatus -import org.opendaylight.controller.md.sal.common.api.data.DataModification import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey -import org.opendaylight.yangtools.yang.binding.DataObject import org.opendaylight.yangtools.yang.binding.InstanceIdentifier -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId import static extension org.opendaylight.controller.sal.compatibility.MDFlowMapping.* @@ -43,6 +54,7 @@ import static extension org.opendaylight.controller.sal.compatibility.ToSalConve class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowListener { private static val LOG = LoggerFactory.getLogger(FlowProgrammerAdapter); + private static val CACHE_NAME = "flowprogrammeradapter.flowtoid"; @Property private SalFlowService delegate; @@ -53,39 +65,38 @@ class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowLi @Property private IPluginOutFlowProgrammerService flowProgrammerPublisher; + @Property + private IClusterGlobalServices clusterGlobalServices; + + + @Property + private Map flowToFlowId = new ConcurrentHashMap(); + + override addFlow(Node node, Flow flow) { - return addFlowAsync(node,flow,0) + return toFutureStatus(internalAddFlowAsync(node,flow,0)); } override modifyFlow(Node node, Flow oldFlow, Flow newFlow) { - return modifyFlowAsync(node, oldFlow,newFlow,0) + return toFutureStatus(internalModifyFlowAsync(node, oldFlow,newFlow,0)); } override removeFlow(Node node, Flow flow) { - return removeFlowAsync(node, flow,0); + return toFutureStatus(internalRemoveFlowAsync(node, flow,0)); } override addFlowAsync(Node node, Flow flow, long rid) { - writeFlow(flow.toMDFlow, new NodeKey(new NodeId(node.getNodeIDString()))); + internalAddFlowAsync(node, flow, rid); return toStatus(true); } override modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) { - writeFlow(newFlow.toMDFlow, new NodeKey(new NodeId(node.getNodeIDString()))); + internalModifyFlowAsync(node, oldFlow, newFlow, rid); return toStatus(true); } - override removeFlowAsync(Node node, Flow adflow, long rid) { - val flow = adflow.toMDFlow; - val modification = this._dataBrokerService.beginTransaction(); - val flowPath = InstanceIdentifier.builder(Nodes) - .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, new NodeKey(new NodeId(node.getNodeIDString()))) - .augmentation(FlowCapableNode) - .child(Table, new TableKey(flow.getTableId())) - .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id)) - .build; - modification.removeConfigurationData(flowPath); - val commitFuture = modification.commit(); + override removeFlowAsync(Node node, Flow flow, long rid) { + internalRemoveFlowAsync(node, flow, rid); return toStatus(true); } @@ -114,27 +125,6 @@ class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowLi } } - - private def writeFlow(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, NodeKey nodeKey) { - val modification = this._dataBrokerService.beginTransaction(); - val flowPath = InstanceIdentifier.builder(Nodes) - .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, nodeKey) - .augmentation(FlowCapableNode) - .child(Table, new TableKey(flow.getTableId())) - .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id)) - .build; - modification.putConfigurationData(flowPath, flow); - val commitFuture = modification.commit(); - try { - val result = commitFuture.get(); - val status = result.getResult(); - } catch (InterruptedException e) { - LOG.error(e.getMessage(), e); - } catch (ExecutionException e) { - LOG.error(e.getMessage(), e); - } - } - public static def toStatus(RpcResult result) { return toStatus(result.isSuccessful()); } @@ -158,7 +148,12 @@ class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowLi } override onFlowRemoved(FlowRemoved notification) { - flowProgrammerPublisher.flowRemoved(notification.node.toADNode,notification.toFlow()); + if(notification != null && notification.node != null) { + val adNode = notification.node.toADNode + if(adNode != null) { + flowProgrammerPublisher.flowRemoved(adNode,notification.toFlow(adNode)); + } + } } override onFlowUpdated(FlowUpdated notification) { @@ -177,4 +172,98 @@ class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowLi NodeExperimenterErrorNotification notification) { // NOOP : Not supported by AD SAL } + + private def Future> writeFlowAsync(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, NodeKey nodeKey){ + val modification = this._dataBrokerService.beginTransaction(); + val flowPath = InstanceIdentifier.builder(Nodes) + .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, nodeKey) + .augmentation(FlowCapableNode) + .child(Table, new TableKey(flow.getTableId())) + .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id)) + .build; + modification.putConfigurationData(flowPath, flow); + return modification.commit(); + } + + private def Future> internalAddFlowAsync(Node node, Flow flow, long rid){ + var flowId = getCache().get(flow); + if(flowId != null) { + removeFlow(node, flow); + return internalAddFlowAsync(node, flow, rid); + } + + flowId = UUID.randomUUID(); + getCache().put(flow, flowId); + + return writeFlowAsync(flow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString()))); + } + + private def Future> internalModifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) { + var flowId = getCache().remove(oldFlow); + if(flowId == null){ + LOG.error("oldFlow not found in cache : " + oldFlow.hashCode); + flowId = UUID.randomUUID(); + getCache().put(oldFlow, flowId); + } + + getCache().put(newFlow, flowId); + return writeFlowAsync(newFlow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString()))); + } + + + private def Future> internalRemoveFlowAsync(Node node, Flow adflow, long rid){ + val flowId = getCache().remove(adflow); + if(flowId == null){ + //throw new IllegalArgumentException("adflow not found in cache : " + adflow.hashCode); + LOG.error("adflow not found in cache : " + adflow.hashCode); + return null; + } + val flow = adflow.toMDFlow(flowId.toString()); + val modification = this._dataBrokerService.beginTransaction(); + val flowPath = InstanceIdentifier.builder(Nodes) + .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, new NodeKey(new NodeId(node.getNodeIDString()))) + .augmentation(FlowCapableNode) + .child(Table, new TableKey(flow.getTableId())) + .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id)) + .build; + modification.removeConfigurationData(flowPath); + return modification.commit(); + } + + private def toFutureStatus(Future> future){ + if(future == null){ + return toStatus(true); + } + + try { + val result = future.get(); + return toStatus(result); + } catch (InterruptedException e) { + return processException(e); + } catch (ExecutionException e) { + return processException(e); + } catch (Exception e){ + processException(e); + } + return toStatus(false); + } + + private def Map getCache(){ + if(clusterGlobalServices == null){ + return new ConcurrentHashMap(); + } + + var cache = clusterGlobalServices.getCache(CACHE_NAME); + + if(cache == null) { + try { + cache = clusterGlobalServices.createCache(CACHE_NAME, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); + } catch (CacheExistException e) { + cache = clusterGlobalServices.getCache(CACHE_NAME); + } + } + return cache as Map; + + } + }