X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fservices%2FSalFlowServiceImpl.java;h=e8dd4d1543bacdd18f8f932777b4f59c6e8ebe38;hb=7b5c345d51a61d0d0375eb0197683408668eb56f;hp=a3feaec73918f2628b78642b46fde25ce0dae196;hpb=3c4699582dc3bcb0e3bb854ba4f73efacdb7f333;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java index a3feaec739..e8dd4d1543 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java @@ -1,76 +1,192 @@ /** * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. - * + * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.openflowplugin.impl.services; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import org.opendaylight.openflowplugin.api.openflow.device.Xid; -import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher; -import org.opendaylight.openflowplugin.api.openflow.md.core.session.IMessageDispatchService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext; +import org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskFactory; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor; +import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlow; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlow; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; public class SalFlowServiceImpl extends CommonService implements SalFlowService { private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(SalFlowServiceImpl.class); - public SalFlowServiceImpl(final RpcContext rpcContext, final short version, final BigInteger datapathId, - final IMessageDispatchService service, final Xid xid, final SwitchConnectionDistinguisher cookie) { - // TODO set cookie - super(rpcContext, version, datapathId, service, xid, cookie); + // TODO set cookie somehow from - DeviceContext probably (temporary set to 0 - primary connection) + private final BigInteger connectionID = PRIMARY_CONNECTION; + + private interface Function { + Future> apply(final BigInteger IDConnection); } public SalFlowServiceImpl(final RpcContext rpcContext) { - this.rpcContext = rpcContext; + super(rpcContext); } - /* - * (non-Javadoc) - * - * @see - * org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService#addFlow(org.opendaylight. - * yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput) - */ @Override public Future> addFlow(final AddFlowInput input) { - return null; + return processFlow(new Function() { + @Override + public ListenableFuture> apply(final BigInteger IDConnection) { + final List ofFlowModInputs = FlowConvertor.toFlowModInputs(input, version, + datapathId); + return chainFlowMods(ofFlowModInputs, 0, IDConnection); + } + }); } - /* - * (non-Javadoc) - * - * @see - * org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService#removeFlow(org.opendaylight - * .yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInput) - */ @Override public Future> removeFlow(final RemoveFlowInput input) { - return null; + return processFlow(new Function() { + @Override + public Future> apply(final BigInteger IDConnection) { + final List ofFlowModInputs = FlowConvertor.toFlowModInputs(input, version, + datapathId); + return provideConnectionAdapter(IDConnection).flowMod(ofFlowModInputs.get(0).build()); + } + }); } - /* - * (non-Javadoc) - * - * @see - * org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService#updateFlow(org.opendaylight - * .yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput) - */ @Override public Future> updateFlow(final UpdateFlowInput input) { - return null; + final UpdateFlowInput in = input; + final UpdatedFlow updated = in.getUpdatedFlow(); + final OriginalFlow original = in.getOriginalFlow(); + + final List allFlowMods = new ArrayList<>(); + List ofFlowModInputs; + + if (!FlowCreatorUtil.canModifyFlow(original, updated, version)) { + // We would need to remove original and add updated. + + // remove flow + final RemoveFlowInputBuilder removeflow = new RemoveFlowInputBuilder(original); + final List ofFlowRemoveInput = FlowConvertor.toFlowModInputs(removeflow.build(), + version, datapathId); + // remove flow should be the first + allFlowMods.addAll(ofFlowRemoveInput); + final AddFlowInputBuilder addFlowInputBuilder = new AddFlowInputBuilder(updated); + ofFlowModInputs = FlowConvertor.toFlowModInputs(addFlowInputBuilder.build(), version, datapathId); + } else { + ofFlowModInputs = FlowConvertor.toFlowModInputs(updated, version, datapathId); + } + + allFlowMods.addAll(ofFlowModInputs); + LOG.debug("Number of flows to push to switch: {}", allFlowMods.size()); + Collections. emptyList(); + return this. processFlow(new Function() { + @Override + public Future> apply(final BigInteger cookie) { + return chainFlowMods(allFlowMods, 0, cookie); + } + }); + } + + private Future> processFlow(final Function function) { + LOG.debug("Calling the FlowMod RPC method on MessageDispatchService"); + // use primary connection + + final RequestContext requestContext = rpcContext.createRequestContext(); + final SettableFuture> result = rpcContext.storeOrFail(requestContext); + + if (!result.isDone()) { + try { + final Future> resultFromOFLib = function.apply(connectionID); + final RpcResult rpcResult = resultFromOFLib.get(getWaitTime(), TimeUnit.MILLISECONDS); + if (!rpcResult.isSuccessful()) { + result.set(RpcResultBuilder. failed().withRpcErrors(rpcResult.getErrors()).build()); + requestContext.close(); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + result.set(RpcResultBuilder + . failed() + .withError(RpcError.ErrorType.APPLICATION, "", + "Flow modification on device wasn't successfull.").build()); + requestContext.close(); + } catch (final Exception e) { + result.set(RpcResultBuilder. failed() + .withError(RpcError.ErrorType.APPLICATION, "", "Flow translation to OF JAVA failed.").build()); + requestContext.close(); + } + + } else { + requestContext.close(); + } + return result; + } + + /** + * Recursive helper method for + * {@link OFRpcTaskFactory#chainFlowMods(java.util.List, int, org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskContext, org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher)} + * {@link OFRpcTaskFactory#createUpdateFlowTask()} to chain results of multiple flowmods. The next flowmod gets + * executed if the earlier one is successful. All the flowmods should have the same xid, in-order to cross-reference + * the notification + */ + protected ListenableFuture> chainFlowMods(final List ofFlowModInputs, + final int index, final BigInteger cookie) { + + final Future> resultFromOFLib = createResultForFlowMod(ofFlowModInputs.get(index), cookie); + + final ListenableFuture> result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); + + if (ofFlowModInputs.size() > index + 1) { + // there are more flowmods to chain + return Futures.transform(result, new AsyncFunction, RpcResult>() { + @Override + public ListenableFuture> apply(final RpcResult input) throws Exception { + if (input.isSuccessful()) { + return chainFlowMods(ofFlowModInputs, index + 1, cookie); + } else { + LOG.warn("Flowmod failed. Any chained flowmods are ignored. xid:{}", ofFlowModInputs.get(index) + .getXid()); + return Futures.immediateFuture(input); + } + } + }); + } else { + return result; + } + } + + protected Future> createResultForFlowMod(final FlowModInputBuilder flowModInput, + final BigInteger cookie) { + flowModInput.setXid(deviceContext.getNextXid().getValue()); + return provideConnectionAdapter(cookie).flowMod(flowModInput.build()); } }