From 94cfd12df400bc76ef1fd6365903129cc3f94777 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 28 May 2014 11:59:53 +0200 Subject: [PATCH] BUG-625: migrate FlowProgrammerAdapter This migrates FlowProgrammerAdapter from xtend to pure Java, cleaning it up in the process. A bunch of possible omissions/errors are marked with FIXME for someone to follow up. Change-Id: I057449bd7a569a75e30182493727dbc200f83e62 Signed-off-by: Robert Varga --- .../compatibility/FlowProgrammerAdapter.java | 302 ++++++++++++++++++ .../compatibility/FlowProgrammerAdapter.xtend | 269 ---------------- 2 files changed, 302 insertions(+), 269 deletions(-) create mode 100644 opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.java delete mode 100644 opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.xtend diff --git a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.java b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.java new file mode 100644 index 0000000000..e5a9d3e5db --- /dev/null +++ b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.java @@ -0,0 +1,302 @@ +/** + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.sal.compatibility; + +import java.util.EnumSet; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.opendaylight.controller.clustering.services.CacheConfigException; +import org.opendaylight.controller.clustering.services.CacheExistException; +import org.opendaylight.controller.clustering.services.IClusterGlobalServices; +import org.opendaylight.controller.clustering.services.IClusterServices.cacheMode; +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.sal.binding.api.data.DataBrokerService; +import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; +import org.opendaylight.controller.sal.core.ConstructionException; +import org.opendaylight.controller.sal.core.Node; +import org.opendaylight.controller.sal.flowprogrammer.Flow; +import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService; +import org.opendaylight.controller.sal.flowprogrammer.IPluginOutFlowProgrammerService; +import org.opendaylight.controller.sal.utils.Status; +import org.opendaylight.controller.sal.utils.StatusCode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowListener { + private final static Logger LOG = LoggerFactory.getLogger(FlowProgrammerAdapter.class); + + // Note: clustering services manipulate this + private final Map flowToFlowId = new ConcurrentHashMap(); + private final static String CACHE_NAME = "flowprogrammeradapter.flowtoid"; + + // These are injected via Apache DM (see ComponentActivator) + private IPluginOutFlowProgrammerService flowProgrammerPublisher; + private IClusterGlobalServices clusterGlobalServices; + private DataBrokerService dataBrokerService; + private SalFlowService delegate; + + public SalFlowService getDelegate() { + return this.delegate; + } + + public void setDelegate(final SalFlowService delegate) { + this.delegate = delegate; + } + + public DataBrokerService getDataBrokerService() { + return this.dataBrokerService; + } + + public void setDataBrokerService(final DataBrokerService dataBrokerService) { + this.dataBrokerService = dataBrokerService; + } + + public IPluginOutFlowProgrammerService getFlowProgrammerPublisher() { + return this.flowProgrammerPublisher; + } + + public void setFlowProgrammerPublisher(final IPluginOutFlowProgrammerService flowProgrammerPublisher) { + this.flowProgrammerPublisher = flowProgrammerPublisher; + } + + public IClusterGlobalServices getClusterGlobalServices() { + return this.clusterGlobalServices; + } + + public void setClusterGlobalServices(final IClusterGlobalServices clusterGlobalServices) { + this.clusterGlobalServices = clusterGlobalServices; + } + + @Override + public Status addFlow(final Node node, final Flow flow) { + return toFutureStatus(internalAddFlowAsync(node, flow, 0)); + } + + @Override + public Status modifyFlow(final Node node, final Flow oldFlow, final Flow newFlow) { + return toFutureStatus(internalModifyFlowAsync(node, oldFlow, newFlow, 0)); + } + + @Override + public Status removeFlow(final Node node, final Flow flow) { + return toFutureStatus(internalRemoveFlowAsync(node, flow, 0)); + } + + @Override + public Status addFlowAsync(final Node node, final Flow flow, final long rid) { + // FIXME is this correct? What if the future fails? + this.internalAddFlowAsync(node, flow, rid); + return FlowProgrammerAdapter.toStatus(true); + } + + @Override + public Status modifyFlowAsync(final Node node, final Flow oldFlow, final Flow newFlow, final long rid) { + // FIXME is this correct? What if the future fails? + this.internalModifyFlowAsync(node, oldFlow, newFlow, rid); + return FlowProgrammerAdapter.toStatus(true); + } + + @Override + public Status removeFlowAsync(final Node node, final Flow flow, final long rid) { + // FIXME is this correct? What if the future fails? + this.internalRemoveFlowAsync(node, flow, rid); + return FlowProgrammerAdapter.toStatus(true); + } + + @Override + public Status removeAllFlows(final Node node) { + // FIXME: unfinished? + return new Status(StatusCode.SUCCESS); + } + + @Override + public Status syncSendBarrierMessage(final Node node) { + // FIXME: unfinished? + return null; + } + + @Override + public Status asyncSendBarrierMessage(final Node node) { + // FIXME: unfinished? + return null; + } + + private static Status toStatus(final boolean successful) { + return new Status(successful ? StatusCode.SUCCESS : StatusCode.INTERNALERROR); + } + + public static Status toStatus(final RpcResult result) { + return toStatus(result.isSuccessful()); + } + + @Override + public void onFlowAdded(final FlowAdded notification) { + // FIXME: unfinished? + } + + @Override + public void onFlowRemoved(final FlowRemoved notification) { + if (notification == null) { + return; + } + + final NodeRef node = notification.getNode(); + if (node == null) { + LOG.debug("Notification {} has not node, ignoring it", notification); + return; + } + + Node adNode; + try { + adNode = NodeMapping.toADNode(notification.getNode()); + } catch (ConstructionException e) { + LOG.warn("Failed to construct AD node for {}, ignoring notification", node, e); + return; + } + flowProgrammerPublisher.flowRemoved(adNode, ToSalConversionsUtils.toFlow(notification, adNode)); + } + + @Override + public void onFlowUpdated(final FlowUpdated notification) { + // FIXME: unfinished? + } + + @Override + public void onSwitchFlowRemoved(final SwitchFlowRemoved notification) { + // FIXME: unfinished? + } + + @Override + public void onNodeErrorNotification(final NodeErrorNotification notification) { + // FIXME: unfinished? + } + + @Override + public void onNodeExperimenterErrorNotification(final NodeExperimenterErrorNotification notification) { + // FIXME: unfinished? + } + + private static final InstanceIdentifier flowPath( + final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, final NodeKey nodeKey) { + return InstanceIdentifier.builder(Nodes.class) + .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class, nodeKey) + .augmentation(FlowCapableNode.class) + .child(Table.class, new TableKey(flow.getTableId())) + .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow.class, new FlowKey(flow.getId())) + .toInstance(); + } + + private Future> writeFlowAsync(final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, final NodeKey nodeKey) { + final DataModificationTransaction modification = this.dataBrokerService.beginTransaction(); + modification.putConfigurationData(flowPath(flow, nodeKey), flow); + return modification.commit(); + } + + private Future> internalAddFlowAsync(final Node node, final Flow flow, final long rid) { + final Map cache = this.getCache(); + UUID flowId = cache.get(flow); + if (flowId != null) { + this.removeFlow(node, flow); + } + + flowId = UUID.randomUUID(); + cache.put(flow, flowId); + return this.writeFlowAsync(MDFlowMapping.toMDFlow(flow, flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString()))); + } + + private Future> internalModifyFlowAsync(final Node node, final Flow oldFlow, final Flow newFlow, final long rid) { + final Map cache = this.getCache(); + + UUID flowId = cache.remove(oldFlow); + if (flowId == null) { + flowId = UUID.randomUUID(); + cache.put(oldFlow, flowId); + LOG.warn("Could not find flow {} in cache, assigned new ID {}", oldFlow.hashCode(), flowId); + } + + cache.put(newFlow, flowId); + return this.writeFlowAsync(MDFlowMapping.toMDFlow(newFlow, flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString()))); + } + + private Future> internalRemoveFlowAsync(final Node node, final Flow adflow, final long rid) { + final Map cache = this.getCache(); + + final UUID flowId = cache.remove(adflow); + if (flowId == null) { + LOG.warn("Could not find flow {} in cache, nothing to do", adflow.hashCode()); + return null; + } + + final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow = MDFlowMapping.toMDFlow(adflow, flowId.toString()); + final DataModificationTransaction modification = this.dataBrokerService.beginTransaction(); + modification.removeConfigurationData(flowPath(flow, new NodeKey(new NodeId(node.getNodeIDString())))); + return modification.commit(); + } + + private static Status toFutureStatus(final Future> future) { + if (future == null) { + // FIXME: really? + return FlowProgrammerAdapter.toStatus(true); + } + + try { + final RpcResult result = future.get(); + return FlowProgrammerAdapter.toStatus(result); + } catch (final InterruptedException e) { + FlowProgrammerAdapter.LOG.error("Interrupted while processing flow", e); + } catch (ExecutionException e) { + FlowProgrammerAdapter.LOG.error("Failed to process flow", e); + } + + return new Status(StatusCode.INTERNALERROR); + } + + @SuppressWarnings("unchecked") + private Map getCache() { + final IClusterGlobalServices cgs = getClusterGlobalServices(); + if (cgs == null) { + return new ConcurrentHashMap(); + } + + Map cache = (Map) cgs.getCache(FlowProgrammerAdapter.CACHE_NAME); + if (cache != null) { + return cache; + } + + try { + return (Map) cgs.createCache(CACHE_NAME, EnumSet.of(cacheMode.TRANSACTIONAL)); + } catch (CacheExistException e) { + return (Map) cgs.getCache(CACHE_NAME); + } catch (CacheConfigException e) { + throw new IllegalStateException("Unexpected cache configuration problem", e); + } + } + +} diff --git a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.xtend b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.xtend deleted file mode 100644 index 8a0874ee31..0000000000 --- a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/FlowProgrammerAdapter.xtend +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.sal.compatibility - -import java.util.Map -import java.util.UUID -import java.util.concurrent.ExecutionException -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.Future -import java.util.EnumSet -import org.opendaylight.controller.sal.core.Node -import org.opendaylight.controller.sal.flowprogrammer.Flow -import org.opendaylight.controller.sal.flowprogrammer.IPluginInFlowProgrammerService -import org.opendaylight.controller.sal.flowprogrammer.IPluginOutFlowProgrammerService -import org.opendaylight.controller.sal.utils.Status -import org.opendaylight.controller.sal.utils.StatusCode -import org.opendaylight.controller.clustering.services.CacheExistException -import org.opendaylight.controller.clustering.services.IClusterGlobalServices -import org.opendaylight.controller.clustering.services.IClusterServices - -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SwitchFlowRemoved -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowListener -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeErrorNotification -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.NodeExperimenterErrorNotification -import org.opendaylight.yangtools.yang.common.RpcResult -import org.slf4j.LoggerFactory - -import org.opendaylight.controller.sal.binding.api.data.DataBrokerService -import org.opendaylight.controller.md.sal.common.api.TransactionStatus -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey -import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId - - -import static extension org.opendaylight.controller.sal.compatibility.MDFlowMapping.* - -import static extension org.opendaylight.controller.sal.compatibility.NodeMapping.* -import static extension org.opendaylight.controller.sal.compatibility.ToSalConversionsUtils.* - -class FlowProgrammerAdapter implements IPluginInFlowProgrammerService, SalFlowListener { - - private static val LOG = LoggerFactory.getLogger(FlowProgrammerAdapter); - private static val CACHE_NAME = "flowprogrammeradapter.flowtoid"; - - @Property - private SalFlowService delegate; - - @Property - private DataBrokerService dataBrokerService; - - @Property - private IPluginOutFlowProgrammerService flowProgrammerPublisher; - - @Property - private IClusterGlobalServices clusterGlobalServices; - - - @Property - private Map flowToFlowId = new ConcurrentHashMap(); - - - override addFlow(Node node, Flow flow) { - return toFutureStatus(internalAddFlowAsync(node,flow,0)); - } - - override modifyFlow(Node node, Flow oldFlow, Flow newFlow) { - return toFutureStatus(internalModifyFlowAsync(node, oldFlow,newFlow,0)); - } - - override removeFlow(Node node, Flow flow) { - return toFutureStatus(internalRemoveFlowAsync(node, flow,0)); - } - - override addFlowAsync(Node node, Flow flow, long rid) { - internalAddFlowAsync(node, flow, rid); - return toStatus(true); - } - - override modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) { - internalModifyFlowAsync(node, oldFlow, newFlow, rid); - return toStatus(true); - } - - override removeFlowAsync(Node node, Flow flow, long rid) { - internalRemoveFlowAsync(node, flow, rid); - return toStatus(true); - } - - override removeAllFlows(Node node) { - // I know this looks like a copout... but its exactly what the legacy OFplugin did - return new Status(StatusCode.SUCCESS); - } - - override syncSendBarrierMessage(Node node) { - - // FIXME: Update YANG model - return null; - } - - override asyncSendBarrierMessage(Node node) { - - // FIXME: Update YANG model - return null; - } - - private static def toStatus(boolean successful) { - if (successful) { - return new Status(StatusCode.SUCCESS); - } else { - return new Status(StatusCode.INTERNALERROR); - } - } - - public static def toStatus(RpcResult result) { - return toStatus(result.isSuccessful()); - } - - private static dispatch def Status processException(InterruptedException e) { - LOG.error("Interruption occured during processing flow",e); - return new Status(StatusCode.INTERNALERROR); - } - - private static dispatch def Status processException(ExecutionException e) { - LOG.error("Execution exception occured during processing flow",e.cause); - return new Status(StatusCode.INTERNALERROR); - } - - private static dispatch def Status processException(Exception e) { - throw new RuntimeException(e); - } - - override onFlowAdded(FlowAdded notification) { - // NOOP : Not supported by AD SAL - } - - override onFlowRemoved(FlowRemoved notification) { - if(notification != null && notification.node != null) { - val adNode = notification.node.toADNode - if(adNode != null) { - flowProgrammerPublisher.flowRemoved(adNode,notification.toFlow(adNode)); - } - } - } - - override onFlowUpdated(FlowUpdated notification) { - // NOOP : Not supported by AD SAL - } - - override onSwitchFlowRemoved(SwitchFlowRemoved notification) { - // NOOP : Not supported by AD SAL - } - - override onNodeErrorNotification(NodeErrorNotification notification) { - // NOOP : Not supported by AD SAL - } - - override onNodeExperimenterErrorNotification( - NodeExperimenterErrorNotification notification) { - // NOOP : Not supported by AD SAL - } - - private def Future> writeFlowAsync(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow flow, NodeKey nodeKey){ - val modification = this._dataBrokerService.beginTransaction(); - val flowPath = InstanceIdentifier.builder(Nodes) - .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, nodeKey) - .augmentation(FlowCapableNode) - .child(Table, new TableKey(flow.getTableId())) - .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id)) - .build; - modification.putConfigurationData(flowPath, flow); - return modification.commit(); - } - - private def Future> internalAddFlowAsync(Node node, Flow flow, long rid){ - var flowId = getCache().get(flow); - if(flowId != null) { - removeFlow(node, flow); - return internalAddFlowAsync(node, flow, rid); - } - - flowId = UUID.randomUUID(); - getCache().put(flow, flowId); - - return writeFlowAsync(flow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString()))); - } - - private def Future> internalModifyFlowAsync(Node node, Flow oldFlow, Flow newFlow, long rid) { - var flowId = getCache().remove(oldFlow); - if(flowId == null){ - LOG.error("oldFlow not found in cache : " + oldFlow.hashCode); - flowId = UUID.randomUUID(); - getCache().put(oldFlow, flowId); - } - - getCache().put(newFlow, flowId); - return writeFlowAsync(newFlow.toMDFlow(flowId.toString()), new NodeKey(new NodeId(node.getNodeIDString()))); - } - - - private def Future> internalRemoveFlowAsync(Node node, Flow adflow, long rid){ - val flowId = getCache().remove(adflow); - if(flowId == null){ - //throw new IllegalArgumentException("adflow not found in cache : " + adflow.hashCode); - LOG.error("adflow not found in cache : " + adflow.hashCode); - return null; - } - val flow = adflow.toMDFlow(flowId.toString()); - val modification = this._dataBrokerService.beginTransaction(); - val flowPath = InstanceIdentifier.builder(Nodes) - .child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node, new NodeKey(new NodeId(node.getNodeIDString()))) - .augmentation(FlowCapableNode) - .child(Table, new TableKey(flow.getTableId())) - .child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow, new FlowKey(flow.id)) - .build; - modification.removeConfigurationData(flowPath); - return modification.commit(); - } - - private def toFutureStatus(Future> future){ - if(future == null){ - return toStatus(true); - } - - try { - val result = future.get(); - return toStatus(result); - } catch (InterruptedException e) { - return processException(e); - } catch (ExecutionException e) { - return processException(e); - } catch (Exception e){ - processException(e); - } - return toStatus(false); - } - - private def Map getCache(){ - if(clusterGlobalServices == null){ - return new ConcurrentHashMap(); - } - - var cache = clusterGlobalServices.getCache(CACHE_NAME); - - if(cache == null) { - try { - cache = clusterGlobalServices.createCache(CACHE_NAME, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL)); - } catch (CacheExistException e) { - cache = clusterGlobalServices.getCache(CACHE_NAME); - } - } - return cache as Map; - - } - -} -- 2.36.6