Merge "Increase timeout for waiting for broker service in sal-binding-it."
[controller.git] / opendaylight / md-sal / compatibility / sal-compatibility / src / main / java / org / opendaylight / controller / sal / compatibility / FlowProgrammerAdapter.xtend
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.compatibility
9
10 import java.util.Map
11 import java.util.UUID
12 import java.util.concurrent.ExecutionException
13 import java.util.concurrent.ConcurrentHashMap
14 import java.util.concurrent.Future
15 import java.util.EnumSet
16 import org.opendaylight.controller.sal.core.Node
17 import org.opendaylight.controller.sal.flowprogrammer.Flow
18 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService
19 import org.opendaylight.controller.sal.flowprogrammer.IPluginOutFlowProgrammerService
20 import org.opendaylight.controller.sal.utils.Status
21 import org.opendaylight.controller.sal.utils.StatusCode
22 import org.opendaylight.controller.clustering.services.CacheExistException
23 import org.opendaylight.controller.clustering.services.IClusterGlobalServices
24 import org.opendaylight.controller.clustering.services.IClusterServices
25
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification
34 import org.opendaylight.yangtools.yang.common.RpcResult
35 import org.slf4j.LoggerFactory
36
37 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
38 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId
47
48
49 import static extension org.opendaylight.controller.sal.compatibility.MDFlowMapping.*
50
51 import static extension org.opendaylight.controller.sal.compatibility.NodeMapping.*
52 import static extension org.opendaylight.controller.sal.compatibility.ToSalConversionsUtils.*
53
54 class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowListener {
55
56     private static val LOG = LoggerFactory.getLogger(FlowProgrammerAdapter);
57     private static val CACHE_NAME = "flowprogrammeradapter.flowtoid";
58
59     @Property
60     private SalFlowService delegate;
61
62     @Property
63     private DataBrokerService dataBrokerService;
64     
65     @Property
66     private IPluginOutFlowProgrammerService flowProgrammerPublisher;
67
68     @Property
69     private IClusterGlobalServices clusterGlobalServices;
70
71
72     @Property
73     private Map<Flow, UUID> flowToFlowId = new ConcurrentHashMap<Flow, UUID>();
74
75
76     override addFlow(Node node, Flow flow) {
77         return toFutureStatus(internalAddFlowAsync(node,flow,0));
78     }
79
80     override modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
81         return toFutureStatus(internalModifyFlowAsync(node, oldFlow,newFlow,0));
82     }
83
84     override removeFlow(Node node, Flow flow) {
85         return toFutureStatus(internalRemoveFlowAsync(node, flow,0));
86     }
87
88     override addFlowAsync(Node node, Flow flow, long rid) {
89         internalAddFlowAsync(node, flow, rid);
90         return toStatus(true);
91     }
92
93     override modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) {
94         internalModifyFlowAsync(node, oldFlow, newFlow, rid);
95         return toStatus(true);
96     }
97
98     override removeFlowAsync(Node node, Flow flow, long rid) {
99         internalRemoveFlowAsync(node, flow, rid);
100         return toStatus(true);
101     }
102
103     override removeAllFlows(Node node) {
104         // I know this looks like a copout... but its exactly what the legacy OFplugin did
105         return new Status(StatusCode.SUCCESS);
106     }
107
108     override syncSendBarrierMessage(Node node) {
109
110         // FIXME: Update YANG model
111         return null;
112     }
113
114     override asyncSendBarrierMessage(Node node) {
115
116         // FIXME: Update YANG model
117         return null;
118     }
119
120     private static def toStatus(boolean successful) {
121         if (successful) {
122             return new Status(StatusCode.SUCCESS);
123         } else {
124             return new Status(StatusCode.INTERNALERROR);
125         }
126     }
127
128     public static def toStatus(RpcResult<?> result) {
129         return toStatus(result.isSuccessful());
130     }
131     
132     private static dispatch def Status processException(InterruptedException e) {
133         LOG.error("Interruption occured during processing flow",e);
134         return new Status(StatusCode.INTERNALERROR);
135     }
136     
137     private static dispatch def Status processException(ExecutionException e) {
138         LOG.error("Execution exception occured during processing flow",e.cause);
139         return new Status(StatusCode.INTERNALERROR);
140     }
141     
142     private static dispatch def Status processException(Exception e) {
143         throw new RuntimeException(e);
144     }
145     
146     override onFlowAdded(FlowAdded notification) {
147         // NOOP : Not supported by AD SAL
148     }
149     
150     override onFlowRemoved(FlowRemoved notification) {
151         if(notification != null && notification.node != null) {
152             val adNode = notification.node.toADNode
153             if(adNode != null) {
154                 flowProgrammerPublisher.flowRemoved(adNode,notification.toFlow(adNode));
155             }
156         } 
157     }
158     
159     override onFlowUpdated(FlowUpdated notification) {
160         // NOOP : Not supported by AD SAL
161     }
162     
163     override onSwitchFlowRemoved(SwitchFlowRemoved notification) {
164         // NOOP : Not supported by AD SAL
165     }
166     
167      override onNodeErrorNotification(NodeErrorNotification notification) {
168         // NOOP : Not supported by AD SAL
169     }
170     
171      override onNodeExperimenterErrorNotification(
172                 NodeExperimenterErrorNotification notification) {
173         // NOOP : Not supported by AD SAL
174     }
175
176     private def Future<RpcResult<TransactionStatus>> writeFlowAsync(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, NodeKey nodeKey){
177         val modification = this._dataBrokerService.beginTransaction();
178         val flowPath = InstanceIdentifier.builder(Nodes)
179                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, nodeKey)
180                 .augmentation(FlowCapableNode)
181                 .child(Table, new TableKey(flow.getTableId()))
182                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
183                 .build;
184         modification.putConfigurationData(flowPath, flow);
185         return modification.commit();
186     }
187
188     private def Future<RpcResult<TransactionStatus>> internalAddFlowAsync(Node node, Flow flow, long rid){
189         var flowId = getCache().get(flow);
190         if(flowId != null) {
191             removeFlow(node, flow);
192             return internalAddFlowAsync(node, flow, rid);
193         }
194
195         flowId = UUID.randomUUID();
196         getCache().put(flow, flowId);
197
198         return writeFlowAsync(flow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString())));
199     }
200
201     private def Future<RpcResult<TransactionStatus>> internalModifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) {
202         val flowId = getCache().remove(oldFlow);
203         if(flowId == null){
204             throw new IllegalArgumentException("oldFlow is unknown");
205         }
206
207         getCache().put(newFlow, flowId);
208         return writeFlowAsync(newFlow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString())));
209     }
210
211
212     private def Future<RpcResult<TransactionStatus>> internalRemoveFlowAsync(Node node, Flow adflow, long rid){
213         val flowId = getCache().remove(adflow);
214         if(flowId == null){
215             throw new IllegalArgumentException("adflow is unknown");
216         }
217         val flow = adflow.toMDFlow(flowId.toString());
218         val modification = this._dataBrokerService.beginTransaction();
219         val flowPath = InstanceIdentifier.builder(Nodes)
220                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, new NodeKey(new NodeId(node.getNodeIDString())))
221                 .augmentation(FlowCapableNode)
222                 .child(Table, new TableKey(flow.getTableId()))
223                 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
224                 .build;
225         modification.removeConfigurationData(flowPath);
226         return modification.commit();
227     }
228
229     private def toFutureStatus(Future<RpcResult<TransactionStatus>> future){
230         try {
231             val result = future.get();
232             return toStatus(result);
233         } catch (InterruptedException e) {
234             return processException(e);
235         } catch (ExecutionException e) {
236             return processException(e);
237         } catch (Exception e){
238             processException(e);
239         }
240         return toStatus(false);
241     }
242
243     private def Map<Flow, UUID> getCache(){
244         if(clusterGlobalServices == null){
245             return new ConcurrentHashMap<Flow, UUID>();
246         }
247
248         var cache = clusterGlobalServices.getCache(CACHE_NAME);
249
250         if(cache == null) {
251             try {
252                 cache = clusterGlobalServices.createCache(CACHE_NAME, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
253             } catch (CacheExistException e) {
254                 cache = clusterGlobalServices.getCache(CACHE_NAME);
255             }
256         }
257         return cache as Map<Flow, UUID>;
258
259     }
260
261 }