package org.opendaylight.controller.sal.compatibility
+import java.util.Map
+import java.util.UUID
import java.util.concurrent.ExecutionException
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.Future
+import java.util.EnumSet
import org.opendaylight.controller.sal.core.Node
import org.opendaylight.controller.sal.flowprogrammer.Flow
import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService
import org.opendaylight.controller.sal.flowprogrammer.IPluginOutFlowProgrammerService
import org.opendaylight.controller.sal.utils.Status
import org.opendaylight.controller.sal.utils.StatusCode
+import org.opendaylight.controller.clustering.services.CacheExistException
+import org.opendaylight.controller.clustering.services.IClusterGlobalServices
+import org.opendaylight.controller.clustering.services.IClusterServices
+
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved
import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
import org.opendaylight.controller.md.sal.common.api.TransactionStatus
-import org.opendaylight.controller.md.sal.common.api.data.DataModification
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
-import org.opendaylight.yangtools.yang.binding.DataObject
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId
import static extension org.opendaylight.controller.sal.compatibility.MDFlowMapping.*
class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowListener {
private static val LOG = LoggerFactory.getLogger(FlowProgrammerAdapter);
+ private static val CACHE_NAME = "flowprogrammeradapter.flowtoid";
@Property
private SalFlowService delegate;
@Property
private IPluginOutFlowProgrammerService flowProgrammerPublisher;
+ @Property
+ private IClusterGlobalServices clusterGlobalServices;
+
+
+ @Property
+ private Map<Flow, UUID> flowToFlowId = new ConcurrentHashMap<Flow, UUID>();
+
+
override addFlow(Node node, Flow flow) {
- return addFlowAsync(node,flow,0)
+ return toFutureStatus(internalAddFlowAsync(node,flow,0));
}
override modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
- return modifyFlowAsync(node, oldFlow,newFlow,0)
+ return toFutureStatus(internalModifyFlowAsync(node, oldFlow,newFlow,0));
}
override removeFlow(Node node, Flow flow) {
- return removeFlowAsync(node, flow,0);
+ return toFutureStatus(internalRemoveFlowAsync(node, flow,0));
}
override addFlowAsync(Node node, Flow flow, long rid) {
- writeFlow(flow.toMDFlow, new NodeKey(new NodeId(node.getNodeIDString())));
+ internalAddFlowAsync(node, flow, rid);
return toStatus(true);
}
override modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) {
- writeFlow(newFlow.toMDFlow, new NodeKey(new NodeId(node.getNodeIDString())));
+ internalModifyFlowAsync(node, oldFlow, newFlow, rid);
return toStatus(true);
}
- override removeFlowAsync(Node node, Flow adflow, long rid) {
- val flow = adflow.toMDFlow;
- val modification = this._dataBrokerService.beginTransaction();
- val flowPath = InstanceIdentifier.builder(Nodes)
- .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, new NodeKey(new NodeId(node.getNodeIDString())))
- .augmentation(FlowCapableNode)
- .child(Table, new TableKey(flow.getTableId()))
- .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
- .build;
- modification.removeConfigurationData(flowPath);
- val commitFuture = modification.commit();
+ override removeFlowAsync(Node node, Flow flow, long rid) {
+ internalRemoveFlowAsync(node, flow, rid);
return toStatus(true);
}
}
}
-
- private def writeFlow(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, NodeKey nodeKey) {
- val modification = this._dataBrokerService.beginTransaction();
- val flowPath = InstanceIdentifier.builder(Nodes)
- .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, nodeKey)
- .augmentation(FlowCapableNode)
- .child(Table, new TableKey(flow.getTableId()))
- .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
- .build;
- modification.putConfigurationData(flowPath, flow);
- val commitFuture = modification.commit();
- try {
- val result = commitFuture.get();
- val status = result.getResult();
- } catch (InterruptedException e) {
- LOG.error(e.getMessage(), e);
- } catch (ExecutionException e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
public static def toStatus(RpcResult<?> result) {
return toStatus(result.isSuccessful());
}
NodeExperimenterErrorNotification notification) {
// NOOP : Not supported by AD SAL
}
+
+ private def Future<RpcResult<TransactionStatus>> writeFlowAsync(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, NodeKey nodeKey){
+ val modification = this._dataBrokerService.beginTransaction();
+ val flowPath = InstanceIdentifier.builder(Nodes)
+ .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, nodeKey)
+ .augmentation(FlowCapableNode)
+ .child(Table, new TableKey(flow.getTableId()))
+ .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
+ .build;
+ modification.putConfigurationData(flowPath, flow);
+ return modification.commit();
+ }
+
+ private def Future<RpcResult<TransactionStatus>> internalAddFlowAsync(Node node, Flow flow, long rid){
+ var flowId = getCache().get(flow);
+ if(flowId != null) {
+ removeFlow(node, flow);
+ return internalAddFlowAsync(node, flow, rid);
+ }
+
+ flowId = UUID.randomUUID();
+ getCache().put(flow, flowId);
+
+ return writeFlowAsync(flow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString())));
+ }
+
+ private def Future<RpcResult<TransactionStatus>> internalModifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) {
+ val flowId = getCache().remove(oldFlow);
+ if(flowId == null){
+ throw new IllegalArgumentException("oldFlow is unknown");
+ }
+
+ getCache().put(newFlow, flowId);
+ return writeFlowAsync(newFlow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString())));
+ }
+
+
+ private def Future<RpcResult<TransactionStatus>> internalRemoveFlowAsync(Node node, Flow adflow, long rid){
+ val flowId = getCache().remove(adflow);
+ if(flowId == null){
+ throw new IllegalArgumentException("adflow is unknown");
+ }
+ val flow = adflow.toMDFlow(flowId.toString());
+ val modification = this._dataBrokerService.beginTransaction();
+ val flowPath = InstanceIdentifier.builder(Nodes)
+ .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, new NodeKey(new NodeId(node.getNodeIDString())))
+ .augmentation(FlowCapableNode)
+ .child(Table, new TableKey(flow.getTableId()))
+ .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
+ .build;
+ modification.removeConfigurationData(flowPath);
+ return modification.commit();
+ }
+
+ private def toFutureStatus(Future<RpcResult<TransactionStatus>> future){
+ try {
+ val result = future.get();
+ return toStatus(result);
+ } catch (InterruptedException e) {
+ return processException(e);
+ } catch (ExecutionException e) {
+ return processException(e);
+ } catch (Exception e){
+ processException(e);
+ }
+ return toStatus(false);
+ }
+
+ private def Map<Flow, UUID> getCache(){
+ if(clusterGlobalServices == null){
+ return new ConcurrentHashMap<Flow, UUID>();
+ }
+
+ var cache = clusterGlobalServices.getCache(CACHE_NAME);
+
+ if(cache == null) {
+ try {
+ cache = clusterGlobalServices.createCache(CACHE_NAME, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ } catch (CacheExistException e) {
+ cache = clusterGlobalServices.getCache(CACHE_NAME);
+ }
+ }
+ return cache as Map<Flow, UUID>;
+
+ }
+
}