Merge "Add support for identity-ref config attributes to config/netconf subsystem"
[controller.git] / opendaylight / md-sal / compatibility / sal-compatibility / src / main / java / org / opendaylight / controller / sal / compatibility / FlowProgrammerAdapter.xtend
1 package org.opendaylight.controller.sal.compatibility
2
3 import java.util.Map
4 import java.util.UUID
5 import java.util.concurrent.ExecutionException
6 import java.util.concurrent.ConcurrentHashMap
7 import java.util.concurrent.Future
8 import java.util.EnumSet
9 import org.opendaylight.controller.sal.core.Node
10 import org.opendaylight.controller.sal.flowprogrammer.Flow
11 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService
12 import org.opendaylight.controller.sal.flowprogrammer.IPluginOutFlowProgrammerService
13 import org.opendaylight.controller.sal.utils.Status
14 import org.opendaylight.controller.sal.utils.StatusCode
15 import org.opendaylight.controller.clustering.services.CacheExistException
16 import org.opendaylight.controller.clustering.services.IClusterGlobalServices
17 import org.opendaylight.controller.clustering.services.IClusterServices
18
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification
27 import org.opendaylight.yangtools.yang.common.RpcResult
28 import org.slf4j.LoggerFactory
29
30 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
31 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
38 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId
40
41
42 import static extension org.opendaylight.controller.sal.compatibility.MDFlowMapping.*
43
44 import static extension org.opendaylight.controller.sal.compatibility.NodeMapping.*
45 import static extension org.opendaylight.controller.sal.compatibility.ToSalConversionsUtils.*
46
47 class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowListener {
48
49     private static val LOG = LoggerFactory.getLogger(FlowProgrammerAdapter);
50     private static val CACHE_NAME = "flowprogrammeradapter.flowtoid";
51
52     @Property
53     private SalFlowService delegate;
54
55     @Property
56     private DataBrokerService dataBrokerService;
57     
58     @Property
59     private IPluginOutFlowProgrammerService flowProgrammerPublisher;
60
61     @Property
62     private IClusterGlobalServices clusterGlobalServices;
63
64
65     @Property
66     private Map<Flow, UUID> flowToFlowId = new ConcurrentHashMap<Flow, UUID>();
67
68
69     override addFlow(Node node, Flow flow) {
70         return toFutureStatus(internalAddFlowAsync(node,flow,0));
71     }
72
73     override modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
74         return toFutureStatus(internalModifyFlowAsync(node, oldFlow,newFlow,0));
75     }
76
77     override removeFlow(Node node, Flow flow) {
78         return toFutureStatus(internalRemoveFlowAsync(node, flow,0));
79     }
80
81     override addFlowAsync(Node node, Flow flow, long rid) {
82         internalAddFlowAsync(node, flow, rid);
83         return toStatus(true);
84     }
85
86     override modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) {
87         internalModifyFlowAsync(node, oldFlow, newFlow, rid);
88         return toStatus(true);
89     }
90
91     override removeFlowAsync(Node node, Flow flow, long rid) {
92         internalRemoveFlowAsync(node, flow, rid);
93         return toStatus(true);
94     }
95
96     override removeAllFlows(Node node) {
97         // I know this looks like a copout... but its exactly what the legacy OFplugin did
98         return new Status(StatusCode.SUCCESS);
99     }
100
101     override syncSendBarrierMessage(Node node) {
102
103         // FIXME: Update YANG model
104         return null;
105     }
106
107     override asyncSendBarrierMessage(Node node) {
108
109         // FIXME: Update YANG model
110         return null;
111     }
112
113     private static def toStatus(boolean successful) {
114         if (successful) {
115             return new Status(StatusCode.SUCCESS);
116         } else {
117             return new Status(StatusCode.INTERNALERROR);
118         }
119     }
120
121     public static def toStatus(RpcResult<?> result) {
122         return toStatus(result.isSuccessful());
123     }
124     
125     private static dispatch def Status processException(InterruptedException e) {
126         LOG.error("Interruption occured during processing flow",e);
127         return new Status(StatusCode.INTERNALERROR);
128     }
129     
130     private static dispatch def Status processException(ExecutionException e) {
131         LOG.error("Execution exception occured during processing flow",e.cause);
132         return new Status(StatusCode.INTERNALERROR);
133     }
134     
135     private static dispatch def Status processException(Exception e) {
136         throw new RuntimeException(e);
137     }
138     
139     override onFlowAdded(FlowAdded notification) {
140         // NOOP : Not supported by AD SAL
141     }
142     
143     override onFlowRemoved(FlowRemoved notification) {
144         flowProgrammerPublisher.flowRemoved(notification.node.toADNode,notification.toFlow(notification.node.toADNode));
145     }
146     
147     override onFlowUpdated(FlowUpdated notification) {
148         // NOOP : Not supported by AD SAL
149     }
150     
151     override onSwitchFlowRemoved(SwitchFlowRemoved notification) {
152         // NOOP : Not supported by AD SAL
153     }
154     
155      override onNodeErrorNotification(NodeErrorNotification notification) {
156         // NOOP : Not supported by AD SAL
157     }
158     
159      override onNodeExperimenterErrorNotification(
160                 NodeExperimenterErrorNotification notification) {
161         // NOOP : Not supported by AD SAL
162     }
163
164     private def Future<RpcResult<TransactionStatus>> writeFlowAsync(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, NodeKey nodeKey){
165         val modification = this._dataBrokerService.beginTransaction();
166         val flowPath = InstanceIdentifier.builder(Nodes)
167                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, nodeKey)
168                 .augmentation(FlowCapableNode)
169                 .child(Table, new TableKey(flow.getTableId()))
170                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
171                 .build;
172         modification.putConfigurationData(flowPath, flow);
173         return modification.commit();
174     }
175
176     private def Future<RpcResult<TransactionStatus>> internalAddFlowAsync(Node node, Flow flow, long rid){
177         var flowId = getCache().get(flow);
178         if(flowId != null) {
179             removeFlow(node, flow);
180             return internalAddFlowAsync(node, flow, rid);
181         }
182
183         flowId = UUID.randomUUID();
184         getCache().put(flow, flowId);
185
186         return writeFlowAsync(flow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString())));
187     }
188
189     private def Future<RpcResult<TransactionStatus>> internalModifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) {
190         val flowId = getCache().remove(oldFlow);
191         if(flowId == null){
192             throw new IllegalArgumentException("oldFlow is unknown");
193         }
194
195         getCache().put(newFlow, flowId);
196         return writeFlowAsync(newFlow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString())));
197     }
198
199
200     private def Future<RpcResult<TransactionStatus>> internalRemoveFlowAsync(Node node, Flow adflow, long rid){
201         val flowId = getCache().remove(adflow);
202         if(flowId == null){
203             throw new IllegalArgumentException("adflow is unknown");
204         }
205         val flow = adflow.toMDFlow(flowId.toString());
206         val modification = this._dataBrokerService.beginTransaction();
207         val flowPath = InstanceIdentifier.builder(Nodes)
208                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, new NodeKey(new NodeId(node.getNodeIDString())))
209                 .augmentation(FlowCapableNode)
210                 .child(Table, new TableKey(flow.getTableId()))
211                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
212                 .build;
213         modification.removeConfigurationData(flowPath);
214         return modification.commit();
215     }
216
217     private def toFutureStatus(Future<RpcResult<TransactionStatus>> future){
218         try {
219             val result = future.get();
220             return toStatus(result);
221         } catch (InterruptedException e) {
222             return processException(e);
223         } catch (ExecutionException e) {
224             return processException(e);
225         } catch (Exception e){
226             processException(e);
227         }
228         return toStatus(false);
229     }
230
231     private def Map<Flow, UUID> getCache(){
232         if(clusterGlobalServices == null){
233             return new ConcurrentHashMap<Flow, UUID>();
234         }
235
236         var cache = clusterGlobalServices.getCache(CACHE_NAME);
237
238         if(cache == null) {
239             try {
240                 cache = clusterGlobalServices.createCache(CACHE_NAME, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
241             } catch (CacheExistException e) {
242                 cache = clusterGlobalServices.getCache(CACHE_NAME);
243             }
244         }
245         return cache as Map<Flow, UUID>;
246
247     }
248
249 }