1 package org.opendaylight.controller.sal.compatibility
5 import java.util.concurrent.ExecutionException
6 import java.util.concurrent.ConcurrentHashMap
7 import java.util.concurrent.Future
8 import java.util.EnumSet
9 import org.opendaylight.controller.sal.core.Node
10 import org.opendaylight.controller.sal.flowprogrammer.Flow
11 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService
12 import org.opendaylight.controller.sal.flowprogrammer.IPluginOutFlowProgrammerService
13 import org.opendaylight.controller.sal.utils.Status
14 import org.opendaylight.controller.sal.utils.StatusCode
15 import org.opendaylight.controller.clustering.services.CacheExistException
16 import org.opendaylight.controller.clustering.services.IClusterGlobalServices
17 import org.opendaylight.controller.clustering.services.IClusterServices
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification
27 import org.opendaylight.yangtools.yang.common.RpcResult
28 import org.slf4j.LoggerFactory
30 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
31 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
38 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId
42 import static extension org.opendaylight.controller.sal.compatibility.MDFlowMapping.*
44 import static extension org.opendaylight.controller.sal.compatibility.NodeMapping.*
45 import static extension org.opendaylight.controller.sal.compatibility.ToSalConversionsUtils.*
47 class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowListener {
49 private static val LOG = LoggerFactory.getLogger(FlowProgrammerAdapter);
50 private static val CACHE_NAME = "flowprogrammeradapter.flowtoid";
53 private SalFlowService delegate;
56 private DataBrokerService dataBrokerService;
59 private IPluginOutFlowProgrammerService flowProgrammerPublisher;
62 private IClusterGlobalServices clusterGlobalServices;
66 private Map<Flow, UUID> flowToFlowId = new ConcurrentHashMap<Flow, UUID>();
69 override addFlow(Node node, Flow flow) {
70 return toFutureStatus(internalAddFlowAsync(node,flow,0));
73 override modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
74 return toFutureStatus(internalModifyFlowAsync(node, oldFlow,newFlow,0));
77 override removeFlow(Node node, Flow flow) {
78 return toFutureStatus(internalRemoveFlowAsync(node, flow,0));
81 override addFlowAsync(Node node, Flow flow, long rid) {
82 internalAddFlowAsync(node, flow, rid);
83 return toStatus(true);
86 override modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) {
87 internalModifyFlowAsync(node, oldFlow, newFlow, rid);
88 return toStatus(true);
91 override removeFlowAsync(Node node, Flow flow, long rid) {
92 internalRemoveFlowAsync(node, flow, rid);
93 return toStatus(true);
96 override removeAllFlows(Node node) {
97 // I know this looks like a copout... but its exactly what the legacy OFplugin did
98 return new Status(StatusCode.SUCCESS);
101 override syncSendBarrierMessage(Node node) {
103 // FIXME: Update YANG model
107 override asyncSendBarrierMessage(Node node) {
109 // FIXME: Update YANG model
113 private static def toStatus(boolean successful) {
115 return new Status(StatusCode.SUCCESS);
117 return new Status(StatusCode.INTERNALERROR);
121 public static def toStatus(RpcResult<?> result) {
122 return toStatus(result.isSuccessful());
125 private static dispatch def Status processException(InterruptedException e) {
126 LOG.error("Interruption occured during processing flow",e);
127 return new Status(StatusCode.INTERNALERROR);
130 private static dispatch def Status processException(ExecutionException e) {
131 LOG.error("Execution exception occured during processing flow",e.cause);
132 return new Status(StatusCode.INTERNALERROR);
135 private static dispatch def Status processException(Exception e) {
136 throw new RuntimeException(e);
139 override onFlowAdded(FlowAdded notification) {
140 // NOOP : Not supported by AD SAL
143 override onFlowRemoved(FlowRemoved notification) {
144 flowProgrammerPublisher.flowRemoved(notification.node.toADNode,notification.toFlow());
147 override onFlowUpdated(FlowUpdated notification) {
148 // NOOP : Not supported by AD SAL
151 override onSwitchFlowRemoved(SwitchFlowRemoved notification) {
152 // NOOP : Not supported by AD SAL
155 override onNodeErrorNotification(NodeErrorNotification notification) {
156 // NOOP : Not supported by AD SAL
159 override onNodeExperimenterErrorNotification(
160 NodeExperimenterErrorNotification notification) {
161 // NOOP : Not supported by AD SAL
164 private def Future<RpcResult<TransactionStatus>> writeFlowAsync(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, NodeKey nodeKey){
165 val modification = this._dataBrokerService.beginTransaction();
166 val flowPath = InstanceIdentifier.builder(Nodes)
167 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, nodeKey)
168 .augmentation(FlowCapableNode)
169 .child(Table, new TableKey(flow.getTableId()))
170 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
172 modification.putConfigurationData(flowPath, flow);
173 return modification.commit();
176 private def Future<RpcResult<TransactionStatus>> internalAddFlowAsync(Node node, Flow flow, long rid){
177 var flowId = getCache().get(flow);
179 removeFlow(node, flow);
180 return internalAddFlowAsync(node, flow, rid);
183 flowId = UUID.randomUUID();
184 getCache().put(flow, flowId);
186 return writeFlowAsync(flow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString())));
189 private def Future<RpcResult<TransactionStatus>> internalModifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) {
190 val flowId = getCache().remove(oldFlow);
192 throw new IllegalArgumentException("oldFlow is unknown");
195 getCache().put(newFlow, flowId);
196 return writeFlowAsync(newFlow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString())));
200 private def Future<RpcResult<TransactionStatus>> internalRemoveFlowAsync(Node node, Flow adflow, long rid){
201 val flowId = getCache().remove(adflow);
203 throw new IllegalArgumentException("adflow is unknown");
205 val flow = adflow.toMDFlow(flowId.toString());
206 val modification = this._dataBrokerService.beginTransaction();
207 val flowPath = InstanceIdentifier.builder(Nodes)
208 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, new NodeKey(new NodeId(node.getNodeIDString())))
209 .augmentation(FlowCapableNode)
210 .child(Table, new TableKey(flow.getTableId()))
211 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
213 modification.removeConfigurationData(flowPath);
214 return modification.commit();
217 private def toFutureStatus(Future<RpcResult<TransactionStatus>> future){
219 val result = future.get();
220 return toStatus(result);
221 } catch (InterruptedException e) {
222 return processException(e);
223 } catch (ExecutionException e) {
224 return processException(e);
225 } catch (Exception e){
228 return toStatus(false);
231 private def Map<Flow, UUID> getCache(){
232 if(clusterGlobalServices == null){
233 return new ConcurrentHashMap<Flow, UUID>();
236 var cache = clusterGlobalServices.getCache(CACHE_NAME);
240 cache = clusterGlobalServices.createCache(CACHE_NAME, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
241 } catch (CacheExistException e) {
242 cache = clusterGlobalServices.getCache(CACHE_NAME);
245 return cache as Map<Flow, UUID>;