BUG-625: migrate ComponentActivator
[controller.git] / opendaylight / md-sal / compatibility / sal-compatibility / src / main / java / org / opendaylight / controller / sal / compatibility / FlowProgrammerAdapter.xtend
index da652e65a7a28bf3e51bc637519c2dc8d94f0a73..8a0874ee3123da1d919db48a5d32e476afb839fc 100644 (file)
@@ -1,21 +1,52 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 package org.opendaylight.controller.sal.compatibility
 
+import java.util.Map
+import java.util.UUID
 import java.util.concurrent.ExecutionException
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.Future
+import java.util.EnumSet
 import org.opendaylight.controller.sal.core.Node
 import org.opendaylight.controller.sal.flowprogrammer.Flow
 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService
 import org.opendaylight.controller.sal.flowprogrammer.IPluginOutFlowProgrammerService
 import org.opendaylight.controller.sal.utils.Status
 import org.opendaylight.controller.sal.utils.StatusCode
+import org.opendaylight.controller.clustering.services.CacheExistException
+import org.opendaylight.controller.clustering.services.IClusterGlobalServices
+import org.opendaylight.controller.clustering.services.IClusterServices
+
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification
 import org.opendaylight.yangtools.yang.common.RpcResult
 import org.slf4j.LoggerFactory
 
-import static org.opendaylight.controller.sal.compatibility.MDFlowMapping.*
+import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId
+
+
+import static extension org.opendaylight.controller.sal.compatibility.MDFlowMapping.*
 
 import static extension org.opendaylight.controller.sal.compatibility.NodeMapping.*
 import static extension org.opendaylight.controller.sal.compatibility.ToSalConversionsUtils.*
@@ -23,67 +54,55 @@ import static extension org.opendaylight.controller.sal.compatibility.ToSalConve
 class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowListener {
 
     private static val LOG = LoggerFactory.getLogger(FlowProgrammerAdapter);
+    private static val CACHE_NAME = "flowprogrammeradapter.flowtoid";
 
     @Property
     private SalFlowService delegate;
+
+    @Property
+    private DataBrokerService dataBrokerService;
     
     @Property
     private IPluginOutFlowProgrammerService flowProgrammerPublisher;
 
+    @Property
+    private IClusterGlobalServices clusterGlobalServices;
+
+
+    @Property
+    private Map<Flow, UUID> flowToFlowId = new ConcurrentHashMap<Flow, UUID>();
+
+
     override addFlow(Node node, Flow flow) {
-        val input = addFlowInput(node, flow);
-        val future = delegate.addFlow(input);
-        try {
-            val result = future.get();
-            return toStatus(result); // how get status from result? conversion?
-        } catch (Exception e) {
-            return processException(e);
-        }
+        return toFutureStatus(internalAddFlowAsync(node,flow,0));
     }
 
     override modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
-        val input = updateFlowInput(node, oldFlow, newFlow);
-        val future = delegate.updateFlow(input);
-        try {
-            val result = future.get();
-            return toStatus(result);
-        } catch (Exception e) {
-            return processException(e);
-        }
+        return toFutureStatus(internalModifyFlowAsync(node, oldFlow,newFlow,0));
     }
 
     override removeFlow(Node node, Flow flow) {
-        val input = removeFlowInput(node, flow);
-        val future = delegate.removeFlow(input);
-
-        try {
-            val result = future.get();
-            return toStatus(result);
-        } catch (Exception e) {
-            return processException(e);
-        }
+        return toFutureStatus(internalRemoveFlowAsync(node, flow,0));
     }
 
     override addFlowAsync(Node node, Flow flow, long rid) {
-        val input = addFlowInput(node, flow);
-        delegate.addFlow(input);
-        return new Status(StatusCode.SUCCESS);
+        internalAddFlowAsync(node, flow, rid);
+        return toStatus(true);
     }
 
     override modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) {
-        val input = updateFlowInput(node, oldFlow, newFlow);
-        delegate.updateFlow(input);
-        return new Status(StatusCode.SUCCESS);
+        internalModifyFlowAsync(node, oldFlow, newFlow, rid);
+        return toStatus(true);
     }
 
     override removeFlowAsync(Node node, Flow flow, long rid) {
-        val input = removeFlowInput(node, flow);
-        delegate.removeFlow(input);
-        return new Status(StatusCode.SUCCESS);
+        internalRemoveFlowAsync(node, flow, rid);
+        return toStatus(true);
     }
 
     override removeAllFlows(Node node) {
-        throw new UnsupportedOperationException("Not present in MD-SAL");
+        // I know this looks like a copout... but its exactly what the legacy OFplugin did
+        return new Status(StatusCode.SUCCESS);
     }
 
     override syncSendBarrierMessage(Node node) {
@@ -98,13 +117,17 @@ class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowLi
         return null;
     }
 
-    public static def toStatus(RpcResult<Void> result) {
-        if (result.isSuccessful()) {
+    private static def toStatus(boolean successful) {
+        if (successful) {
             return new Status(StatusCode.SUCCESS);
         } else {
             return new Status(StatusCode.INTERNALERROR);
         }
     }
+
+    public static def toStatus(RpcResult<?> result) {
+        return toStatus(result.isSuccessful());
+    }
     
     private static dispatch def Status processException(InterruptedException e) {
         LOG.error("Interruption occured during processing flow",e);
@@ -125,11 +148,122 @@ class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowLi
     }
     
     override onFlowRemoved(FlowRemoved notification) {
-        flowProgrammerPublisher.flowRemoved(notification.node.toADNode,notification.toFlow());
+        if(notification != null && notification.node != null) {
+            val adNode = notification.node.toADNode
+            if(adNode != null) {
+                flowProgrammerPublisher.flowRemoved(adNode,notification.toFlow(adNode));
+            }
+        } 
     }
     
     override onFlowUpdated(FlowUpdated notification) {
         // NOOP : Not supported by AD SAL
     }
     
+    override onSwitchFlowRemoved(SwitchFlowRemoved notification) {
+        // NOOP : Not supported by AD SAL
+    }
+    
+     override onNodeErrorNotification(NodeErrorNotification notification) {
+        // NOOP : Not supported by AD SAL
+    }
+    
+     override onNodeExperimenterErrorNotification(
+                NodeExperimenterErrorNotification notification) {
+        // NOOP : Not supported by AD SAL
+    }
+
+    private def Future<RpcResult<TransactionStatus>> writeFlowAsync(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, NodeKey nodeKey){
+        val modification = this._dataBrokerService.beginTransaction();
+        val flowPath = InstanceIdentifier.builder(Nodes)
+                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, nodeKey)
+                .augmentation(FlowCapableNode)
+                .child(Table, new TableKey(flow.getTableId()))
+                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
+                .build;
+        modification.putConfigurationData(flowPath, flow);
+        return modification.commit();
+    }
+
+    private def Future<RpcResult<TransactionStatus>> internalAddFlowAsync(Node node, Flow flow, long rid){
+        var flowId = getCache().get(flow);
+        if(flowId != null) {
+            removeFlow(node, flow);
+            return internalAddFlowAsync(node, flow, rid);
+        }
+
+        flowId = UUID.randomUUID();
+        getCache().put(flow, flowId);
+
+        return writeFlowAsync(flow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString())));
+    }
+
+    private def Future<RpcResult<TransactionStatus>> internalModifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) {
+        var flowId = getCache().remove(oldFlow);
+        if(flowId == null){
+            LOG.error("oldFlow not found in cache : " + oldFlow.hashCode);
+            flowId = UUID.randomUUID();
+            getCache().put(oldFlow, flowId);
+        }
+
+        getCache().put(newFlow, flowId);
+        return writeFlowAsync(newFlow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString())));
+    }
+
+
+    private def Future<RpcResult<TransactionStatus>> internalRemoveFlowAsync(Node node, Flow adflow, long rid){
+        val flowId = getCache().remove(adflow);
+        if(flowId == null){
+            //throw new IllegalArgumentException("adflow not found in cache : " + adflow.hashCode);
+            LOG.error("adflow not found in cache : " + adflow.hashCode);
+            return null;
+        }
+        val flow = adflow.toMDFlow(flowId.toString());
+        val modification = this._dataBrokerService.beginTransaction();
+        val flowPath = InstanceIdentifier.builder(Nodes)
+                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, new NodeKey(new NodeId(node.getNodeIDString())))
+                .augmentation(FlowCapableNode)
+                .child(Table, new TableKey(flow.getTableId()))
+                .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
+                .build;
+        modification.removeConfigurationData(flowPath);
+        return modification.commit();
+    }
+
+    private def toFutureStatus(Future<RpcResult<TransactionStatus>> future){
+        if(future == null){
+            return toStatus(true);
+        }
+
+        try {
+            val result = future.get();
+            return toStatus(result);
+        } catch (InterruptedException e) {
+            return processException(e);
+        } catch (ExecutionException e) {
+            return processException(e);
+        } catch (Exception e){
+            processException(e);
+        }
+        return toStatus(false);
+    }
+
+    private def Map<Flow, UUID> getCache(){
+        if(clusterGlobalServices == null){
+            return new ConcurrentHashMap<Flow, UUID>();
+        }
+
+        var cache = clusterGlobalServices.getCache(CACHE_NAME);
+
+        if(cache == null) {
+            try {
+                cache = clusterGlobalServices.createCache(CACHE_NAME, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+            } catch (CacheExistException e) {
+                cache = clusterGlobalServices.getCache(CACHE_NAME);
+            }
+        }
+        return cache as Map<Flow, UUID>;
+
+    }
+
 }