2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.compatibility
12 import java.util.concurrent.ExecutionException
13 import java.util.concurrent.ConcurrentHashMap
14 import java.util.concurrent.Future
15 import java.util.EnumSet
16 import org.opendaylight.controller.sal.core.Node
17 import org.opendaylight.controller.sal.flowprogrammer.Flow
18 import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService
19 import org.opendaylight.controller.sal.flowprogrammer.IPluginOutFlowProgrammerService
20 import org.opendaylight.controller.sal.utils.Status
21 import org.opendaylight.controller.sal.utils.StatusCode
22 import org.opendaylight.controller.clustering.services.CacheExistException
23 import org.opendaylight.controller.clustering.services.IClusterGlobalServices
24 import org.opendaylight.controller.clustering.services.IClusterServices
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification
34 import org.opendaylight.yangtools.yang.common.RpcResult
35 import org.slf4j.LoggerFactory
37 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService
38 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId
49 import static extension org.opendaylight.controller.sal.compatibility.MDFlowMapping.*
51 import static extension org.opendaylight.controller.sal.compatibility.NodeMapping.*
52 import static extension org.opendaylight.controller.sal.compatibility.ToSalConversionsUtils.*
54 class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowListener {
56 private static val LOG = LoggerFactory.getLogger(FlowProgrammerAdapter);
57 private static val CACHE_NAME = "flowprogrammeradapter.flowtoid";
60 private SalFlowService delegate;
63 private DataBrokerService dataBrokerService;
66 private IPluginOutFlowProgrammerService flowProgrammerPublisher;
69 private IClusterGlobalServices clusterGlobalServices;
73 private Map<Flow, UUID> flowToFlowId = new ConcurrentHashMap<Flow, UUID>();
76 override addFlow(Node node, Flow flow) {
77 return toFutureStatus(internalAddFlowAsync(node,flow,0));
80 override modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
81 return toFutureStatus(internalModifyFlowAsync(node, oldFlow,newFlow,0));
84 override removeFlow(Node node, Flow flow) {
85 return toFutureStatus(internalRemoveFlowAsync(node, flow,0));
88 override addFlowAsync(Node node, Flow flow, long rid) {
89 internalAddFlowAsync(node, flow, rid);
90 return toStatus(true);
93 override modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) {
94 internalModifyFlowAsync(node, oldFlow, newFlow, rid);
95 return toStatus(true);
98 override removeFlowAsync(Node node, Flow flow, long rid) {
99 internalRemoveFlowAsync(node, flow, rid);
100 return toStatus(true);
103 override removeAllFlows(Node node) {
104 // I know this looks like a copout... but its exactly what the legacy OFplugin did
105 return new Status(StatusCode.SUCCESS);
108 override syncSendBarrierMessage(Node node) {
110 // FIXME: Update YANG model
114 override asyncSendBarrierMessage(Node node) {
116 // FIXME: Update YANG model
120 private static def toStatus(boolean successful) {
122 return new Status(StatusCode.SUCCESS);
124 return new Status(StatusCode.INTERNALERROR);
128 public static def toStatus(RpcResult<?> result) {
129 return toStatus(result.isSuccessful());
132 private static dispatch def Status processException(InterruptedException e) {
133 LOG.error("Interruption occured during processing flow",e);
134 return new Status(StatusCode.INTERNALERROR);
137 private static dispatch def Status processException(ExecutionException e) {
138 LOG.error("Execution exception occured during processing flow",e.cause);
139 return new Status(StatusCode.INTERNALERROR);
142 private static dispatch def Status processException(Exception e) {
143 throw new RuntimeException(e);
146 override onFlowAdded(FlowAdded notification) {
147 // NOOP : Not supported by AD SAL
150 override onFlowRemoved(FlowRemoved notification) {
151 if(notification != null && notification.node != null) {
152 val adNode = notification.node.toADNode
154 flowProgrammerPublisher.flowRemoved(adNode,notification.toFlow(adNode));
159 override onFlowUpdated(FlowUpdated notification) {
160 // NOOP : Not supported by AD SAL
163 override onSwitchFlowRemoved(SwitchFlowRemoved notification) {
164 // NOOP : Not supported by AD SAL
167 override onNodeErrorNotification(NodeErrorNotification notification) {
168 // NOOP : Not supported by AD SAL
171 override onNodeExperimenterErrorNotification(
172 NodeExperimenterErrorNotification notification) {
173 // NOOP : Not supported by AD SAL
176 private def Future<RpcResult<TransactionStatus>> writeFlowAsync(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, NodeKey nodeKey){
177 val modification = this._dataBrokerService.beginTransaction();
178 val flowPath = InstanceIdentifier.builder(Nodes)
179 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, nodeKey)
180 .augmentation(FlowCapableNode)
181 .child(Table, new TableKey(flow.getTableId()))
182 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
184 modification.putConfigurationData(flowPath, flow);
185 return modification.commit();
188 private def Future<RpcResult<TransactionStatus>> internalAddFlowAsync(Node node, Flow flow, long rid){
189 var flowId = getCache().get(flow);
191 removeFlow(node, flow);
192 return internalAddFlowAsync(node, flow, rid);
195 flowId = UUID.randomUUID();
196 getCache().put(flow, flowId);
198 return writeFlowAsync(flow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString())));
201 private def Future<RpcResult<TransactionStatus>> internalModifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) {
202 var flowId = getCache().remove(oldFlow);
204 LOG.error("oldFlow not found in cache : " + oldFlow.hashCode);
205 flowId = UUID.randomUUID();
206 getCache().put(oldFlow, flowId);
209 getCache().put(newFlow, flowId);
210 return writeFlowAsync(newFlow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString())));
214 private def Future<RpcResult<TransactionStatus>> internalRemoveFlowAsync(Node node, Flow adflow, long rid){
215 val flowId = getCache().remove(adflow);
217 //throw new IllegalArgumentException("adflow not found in cache : " + adflow.hashCode);
218 LOG.error("adflow not found in cache : " + adflow.hashCode);
221 val flow = adflow.toMDFlow(flowId.toString());
222 val modification = this._dataBrokerService.beginTransaction();
223 val flowPath = InstanceIdentifier.builder(Nodes)
224 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, new NodeKey(new NodeId(node.getNodeIDString())))
225 .augmentation(FlowCapableNode)
226 .child(Table, new TableKey(flow.getTableId()))
227 .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id))
229 modification.removeConfigurationData(flowPath);
230 return modification.commit();
233 private def toFutureStatus(Future<RpcResult<TransactionStatus>> future){
235 return toStatus(true);
239 val result = future.get();
240 return toStatus(result);
241 } catch (InterruptedException e) {
242 return processException(e);
243 } catch (ExecutionException e) {
244 return processException(e);
245 } catch (Exception e){
248 return toStatus(false);
251 private def Map<Flow, UUID> getCache(){
252 if(clusterGlobalServices == null){
253 return new ConcurrentHashMap<Flow, UUID>();
256 var cache = clusterGlobalServices.getCache(CACHE_NAME);
260 cache = clusterGlobalServices.createCache(CACHE_NAME, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
261 } catch (CacheExistException e) {
262 cache = clusterGlobalServices.getCache(CACHE_NAME);
265 return cache as Map<Flow, UUID>;