1 package org.opendaylight.controller.sal.compatibility
5 import java.util.concurrent.ExecutionException
6 import java.util.concurrent.ConcurrentHashMap
7 import java.util.concurrent.Future
8 import java.util.EnumSet
9 import org.opendaylight.controller.sal.core.Node
10 import org.opendaylight.controller.sal.flowprogrammer.Flow
11 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService
12 import org.opendaylight.controller.sal.flowprogrammer.IPluginOutFlowProgrammerService
13 import org.opendaylight.controller.sal.utils.Status
14 import org.opendaylight.controller.sal.utils.StatusCode
15 import org.opendaylight.controller.clustering.services.CacheExistException
16 import org.opendaylight.controller.clustering.services.IClusterGlobalServices
17 import org.opendaylight.controller.clustering.services.IClusterServices
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification
27 import org.opendaylight.yangtools.yang.common.RpcResult
28 import org.slf4j.LoggerFactory
30 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
31 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
38 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId
42 import static extension org.opendaylight.controller.sal.compatibility.MDFlowMapping.*
44 import static extension org.opendaylight.controller.sal.compatibility.NodeMapping.*
45 import static extension org.opendaylight.controller.sal.compatibility.ToSalConversionsUtils.*
47 class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowListener {
49 private static val LOG = LoggerFactory.getLogger(FlowProgrammerAdapter);
50 private static val CACHE_NAME = "flowprogrammeradapter.flowtoid";
53 private SalFlowService delegate;
56 private DataBrokerService dataBrokerService;
59 private IPluginOutFlowProgrammerService flowProgrammerPublisher;
62 private IClusterGlobalServices clusterGlobalServices;
66 private Map<Flow, UUID> flowToFlowId = new ConcurrentHashMap<Flow, UUID>();
69 override addFlow(Node node, Flow flow) {
70 return toFutureStatus(internalAddFlowAsync(node,flow,0));
73 override modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
74 return toFutureStatus(internalModifyFlowAsync(node, oldFlow,newFlow,0));
77 override removeFlow(Node node, Flow flow) {
78 return toFutureStatus(internalRemoveFlowAsync(node, flow,0));
81 override addFlowAsync(Node node, Flow flow, long rid) {
82 internalAddFlowAsync(node, flow, rid);
83 return toStatus(true);
86 override modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) {
87 internalModifyFlowAsync(node, oldFlow, newFlow, rid);
88 return toStatus(true);
91 override removeFlowAsync(Node node, Flow flow, long rid) {
92 internalRemoveFlowAsync(node, flow, rid);
93 return toStatus(true);
96 override removeAllFlows(Node node) {
97 // I know this looks like a copout... but its exactly what the legacy OFplugin did
98 return new Status(StatusCode.SUCCESS);
101 override syncSendBarrierMessage(Node node) {
103 // FIXME: Update YANG model
107 override asyncSendBarrierMessage(Node node) {
109 // FIXME: Update YANG model
113 private static def toStatus(boolean successful) {
115 return new Status(StatusCode.SUCCESS);
117 return new Status(StatusCode.INTERNALERROR);
121 public static def toStatus(RpcResult<?> result) {
122 return toStatus(result.isSuccessful());
125 private static dispatch def Status processException(InterruptedException e) {
126 LOG.error("Interruption occured during processing flow",e);
127 return new Status(StatusCode.INTERNALERROR);
130 private static dispatch def Status processException(ExecutionException e) {
131 LOG.error("Execution exception occured during processing flow",e.cause);
132 return new Status(StatusCode.INTERNALERROR);
135 private static dispatch def Status processException(Exception e) {
136 throw new RuntimeException(e);
139 override onFlowAdded(FlowAdded notification) {
140 // NOOP : Not supported by AD SAL
143 override onFlowRemoved(FlowRemoved notification) {
144 if(notification != null && notification.node != null) {
145 val adNode = notification.node.toADNode
147 flowProgrammerPublisher.flowRemoved(adNode,notification.toFlow(adNode));
152 override onFlowUpdated(FlowUpdated notification) {
153 // NOOP : Not supported by AD SAL
156 override onSwitchFlowRemoved(SwitchFlowRemoved notification) {
157 // NOOP : Not supported by AD SAL
160 override onNodeErrorNotification(NodeErrorNotification notification) {
161 // NOOP : Not supported by AD SAL
164 override onNodeExperimenterErrorNotification(
165 NodeExperimenterErrorNotification notification) {
166 // NOOP : Not supported by AD SAL
169 private def Future<RpcResult<TransactionStatus>> writeFlowAsync(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, NodeKey nodeKey){
170 val modification = this._dataBrokerService.beginTransaction();
171 val flowPath = InstanceIdentifier.builder(Nodes)
172 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, nodeKey)
173 .augmentation(FlowCapableNode)
174 .child(Table, new TableKey(flow.getTableId()))
175 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
177 modification.putConfigurationData(flowPath, flow);
178 return modification.commit();
181 private def Future<RpcResult<TransactionStatus>> internalAddFlowAsync(Node node, Flow flow, long rid){
182 var flowId = getCache().get(flow);
184 removeFlow(node, flow);
185 return internalAddFlowAsync(node, flow, rid);
188 flowId = UUID.randomUUID();
189 getCache().put(flow, flowId);
191 return writeFlowAsync(flow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString())));
194 private def Future<RpcResult<TransactionStatus>> internalModifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) {
195 val flowId = getCache().remove(oldFlow);
197 throw new IllegalArgumentException("oldFlow is unknown");
200 getCache().put(newFlow, flowId);
201 return writeFlowAsync(newFlow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString())));
205 private def Future<RpcResult<TransactionStatus>> internalRemoveFlowAsync(Node node, Flow adflow, long rid){
206 val flowId = getCache().remove(adflow);
208 throw new IllegalArgumentException("adflow is unknown");
210 val flow = adflow.toMDFlow(flowId.toString());
211 val modification = this._dataBrokerService.beginTransaction();
212 val flowPath = InstanceIdentifier.builder(Nodes)
213 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, new NodeKey(new NodeId(node.getNodeIDString())))
214 .augmentation(FlowCapableNode)
215 .child(Table, new TableKey(flow.getTableId()))
216 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
218 modification.removeConfigurationData(flowPath);
219 return modification.commit();
222 private def toFutureStatus(Future<RpcResult<TransactionStatus>> future){
224 val result = future.get();
225 return toStatus(result);
226 } catch (InterruptedException e) {
227 return processException(e);
228 } catch (ExecutionException e) {
229 return processException(e);
230 } catch (Exception e){
233 return toStatus(false);
236 private def Map<Flow, UUID> getCache(){
237 if(clusterGlobalServices == null){
238 return new ConcurrentHashMap<Flow, UUID>();
241 var cache = clusterGlobalServices.getCache(CACHE_NAME);
245 cache = clusterGlobalServices.createCache(CACHE_NAME, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
246 } catch (CacheExistException e) {
247 cache = clusterGlobalServices.getCache(CACHE_NAME);
250 return cache as Map<Flow, UUID>;