Switch from using a Long as FlowId to a Uri
[controller.git] / opendaylight / md-sal / compatibility / sal-compatibility / src / main / java / org / opendaylight / controller / sal / compatibility / FlowProgrammerAdapter.xtend
index 450c7f1f239c4634b994fd6b71b941492321d8b4..35c641c45aae4e3d0b9f18233abd96228e8ca8fd 100644 (file)
@@ -1,12 +1,21 @@
 package org.opendaylight.controller.sal.compatibility
 
+import java.util.Map
+import java.util.UUID
 import java.util.concurrent.ExecutionException
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.Future
+import java.util.EnumSet
 import org.opendaylight.controller.sal.core.Node
 import org.opendaylight.controller.sal.flowprogrammer.Flow
 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService
 import org.opendaylight.controller.sal.flowprogrammer.IPluginOutFlowProgrammerService
 import org.opendaylight.controller.sal.utils.Status
 import org.opendaylight.controller.sal.utils.StatusCode
+import org.opendaylight.controller.clustering.services.CacheExistException
+import org.opendaylight.controller.clustering.services.IClusterGlobalServices
+import org.opendaylight.controller.clustering.services.IClusterServices
+
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved
@@ -20,19 +29,14 @@ import org.slf4j.LoggerFactory
 
 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
-import org.opendaylight.controller.md.sal.common.api.data.DataModification
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
-import org.opendaylight.yangtools.yang.binding.DataObject
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId
 
 
 import static extension org.opendaylight.controller.sal.compatibility.MDFlowMapping.*
@@ -43,6 +47,7 @@ import static extension org.opendaylight.controller.sal.compatibility.ToSalConve
 class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowListener {
 
     private static val LOG = LoggerFactory.getLogger(FlowProgrammerAdapter);
+    private static val CACHE_NAME = "flowprogrammeradapter.flowtoid";
 
     @Property
     private SalFlowService delegate;
@@ -53,39 +58,38 @@ class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowLi
     @Property
     private IPluginOutFlowProgrammerService flowProgrammerPublisher;
 
+    @Property
+    private IClusterGlobalServices clusterGlobalServices;
+
+
+    @Property
+    private Map<Flow, UUID> flowToFlowId = new ConcurrentHashMap<Flow, UUID>();
+
+
     override addFlow(Node node, Flow flow) {
-        return addFlowAsync(node,flow,0)
+        return toFutureStatus(internalAddFlowAsync(node,flow,0));
     }
 
     override modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
-        return modifyFlowAsync(node, oldFlow,newFlow,0)
+        return toFutureStatus(internalModifyFlowAsync(node, oldFlow,newFlow,0));
     }
 
     override removeFlow(Node node, Flow flow) {
-        return removeFlowAsync(node, flow,0);
+        return toFutureStatus(internalRemoveFlowAsync(node, flow,0));
     }
 
     override addFlowAsync(Node node, Flow flow, long rid) {
-        writeFlow(flow.toMDFlow, new NodeKey(new NodeId(node.getNodeIDString())));
+        internalAddFlowAsync(node, flow, rid);
         return toStatus(true);
     }
 
     override modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) {
-        writeFlow(newFlow.toMDFlow, new NodeKey(new NodeId(node.getNodeIDString())));
+        internalModifyFlowAsync(node, oldFlow, newFlow, rid);
         return toStatus(true);
     }
 
-    override removeFlowAsync(Node node, Flow adflow, long rid) {
-        val flow = adflow.toMDFlow;
-        val modification = this._dataBrokerService.beginTransaction();
-        val flowPath = InstanceIdentifier.builder(Nodes)
-                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, new NodeKey(new NodeId(node.getNodeIDString())))
-                .augmentation(FlowCapableNode)
-                .child(Table, new TableKey(flow.getTableId()))
-                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
-                .build;
-        modification.removeConfigurationData(flowPath);
-        val commitFuture = modification.commit();
+    override removeFlowAsync(Node node, Flow flow, long rid) {
+        internalRemoveFlowAsync(node, flow, rid);
         return toStatus(true);
     }
 
@@ -114,27 +118,6 @@ class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowLi
         }
     }
 
-
-    private def writeFlow(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, NodeKey nodeKey) {
-        val modification = this._dataBrokerService.beginTransaction();
-        val flowPath = InstanceIdentifier.builder(Nodes)
-                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, nodeKey)
-                .augmentation(FlowCapableNode)
-                .child(Table, new TableKey(flow.getTableId()))
-                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
-                .build;
-        modification.putConfigurationData(flowPath, flow);
-        val commitFuture = modification.commit();
-        try {
-            val result = commitFuture.get();
-            val status = result.getResult();
-        } catch (InterruptedException e) {
-            LOG.error(e.getMessage(), e);
-        } catch (ExecutionException e) {
-            LOG.error(e.getMessage(), e);
-        }
-    }
-
     public static def toStatus(RpcResult<?> result) {
         return toStatus(result.isSuccessful());
     }
@@ -177,4 +160,90 @@ class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowLi
                 NodeExperimenterErrorNotification notification) {
         // NOOP : Not supported by AD SAL
     }
+
+    private def Future<RpcResult<TransactionStatus>> writeFlowAsync(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, NodeKey nodeKey){
+        val modification = this._dataBrokerService.beginTransaction();
+        val flowPath = InstanceIdentifier.builder(Nodes)
+                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, nodeKey)
+                .augmentation(FlowCapableNode)
+                .child(Table, new TableKey(flow.getTableId()))
+                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
+                .build;
+        modification.putConfigurationData(flowPath, flow);
+        return modification.commit();
+    }
+
+    private def Future<RpcResult<TransactionStatus>> internalAddFlowAsync(Node node, Flow flow, long rid){
+        var flowId = getCache().get(flow);
+        if(flowId != null) {
+            removeFlow(node, flow);
+            return internalAddFlowAsync(node, flow, rid);
+        }
+
+        flowId = UUID.randomUUID();
+        getCache().put(flow, flowId);
+
+        return writeFlowAsync(flow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString())));
+    }
+
+    private def Future<RpcResult<TransactionStatus>> internalModifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) {
+        val flowId = getCache().remove(oldFlow);
+        if(flowId == null){
+            throw new IllegalArgumentException("oldFlow is unknown");
+        }
+
+        getCache().put(newFlow, flowId);
+        return writeFlowAsync(newFlow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString())));
+    }
+
+
+    private def Future<RpcResult<TransactionStatus>> internalRemoveFlowAsync(Node node, Flow adflow, long rid){
+        val flowId = getCache().remove(adflow);
+        if(flowId == null){
+            throw new IllegalArgumentException("adflow is unknown");
+        }
+        val flow = adflow.toMDFlow(flowId.toString());
+        val modification = this._dataBrokerService.beginTransaction();
+        val flowPath = InstanceIdentifier.builder(Nodes)
+                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, new NodeKey(new NodeId(node.getNodeIDString())))
+                .augmentation(FlowCapableNode)
+                .child(Table, new TableKey(flow.getTableId()))
+                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
+                .build;
+        modification.removeConfigurationData(flowPath);
+        return modification.commit();
+    }
+
+    private def toFutureStatus(Future<RpcResult<TransactionStatus>> future){
+        try {
+            val result = future.get();
+            return toStatus(result);
+        } catch (InterruptedException e) {
+            return processException(e);
+        } catch (ExecutionException e) {
+            return processException(e);
+        } catch (Exception e){
+            processException(e);
+        }
+        return toStatus(false);
+    }
+
+    private def Map<Flow, UUID> getCache(){
+        if(clusterGlobalServices == null){
+            return new ConcurrentHashMap<Flow, UUID>();
+        }
+
+        var cache = clusterGlobalServices.getCache(CACHE_NAME);
+
+        if(cache == null) {
+            try {
+                cache = clusterGlobalServices.createCache(CACHE_NAME, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+            } catch (CacheExistException e) {
+                cache = clusterGlobalServices.getCache(CACHE_NAME);
+            }
+        }
+        return cache as Map<Flow, UUID>;
+
+    }
+
 }