2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.services;
10 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
11 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
12 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
13 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
14 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.SettableFuture;
17 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
18 import com.google.common.base.Function;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.JdkFutureAdapters;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import java.math.BigInteger;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.concurrent.Future;
26 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor;
27 import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlow;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlow;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.slf4j.Logger;
43 public class SalFlowServiceImpl extends CommonService implements SalFlowService {
45 private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(SalFlowServiceImpl.class);
48 public SalFlowServiceImpl(final RequestContextStack requestContextStack, final DeviceContext deviceContext) {
49 super(requestContextStack, deviceContext);
52 <T, F> ListenableFuture<RpcResult<T>> handleServiceCall(final BigInteger connectionID,
53 final FlowModInputBuilder flowModInputBuilder, final Function<DataCrate<T>, ListenableFuture<RpcResult<F>>> function) {
54 LOG.debug("Calling the FlowMod RPC method on MessageDispatchService");
56 final RequestContext<T> requestContext = requestContextStack.createRequestContext();
57 final SettableFuture<RpcResult<T>> result = requestContextStack.storeOrFail(requestContext);
58 final DataCrate<T> dataCrate = DataCrateBuilder.<T> builder().setiDConnection(connectionID)
59 .setRequestContext(requestContext).setFlowModInputBuilder(flowModInputBuilder).build();
61 if (!result.isDone()) {
62 final ListenableFuture<RpcResult<F>> resultFromOFLib = function.apply(dataCrate);
64 final OFJResult2RequestCtxFuture<T> OFJResult2RequestCtxFuture = new OFJResult2RequestCtxFuture<>(requestContext, deviceContext);
65 OFJResult2RequestCtxFuture.processResultFromOfJava(resultFromOFLib);
68 RequestContextUtil.closeRequstContext(requestContext);
74 public Future<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) {
75 final List<FlowModInputBuilder> ofFlowModInputs = FlowConvertor.toFlowModInputs(input, version, datapathId);
76 return processFlowModInputBuilders(ofFlowModInputs);
80 public Future<RpcResult<RemoveFlowOutput>> removeFlow(final RemoveFlowInput input) {
82 return this.<RemoveFlowOutput, Void> handleServiceCall(PRIMARY_CONNECTION,
83 new Function<DataCrate<RemoveFlowOutput>, ListenableFuture<RpcResult<Void>>>() {
85 public ListenableFuture<RpcResult<Void>> apply(final DataCrate<RemoveFlowOutput> data) {
86 final FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, version,
88 return createResultForFlowMod(data, ofFlowModInput);
94 public Future<RpcResult<UpdateFlowOutput>> updateFlow(final UpdateFlowInput input) {
95 final UpdateFlowInput in = input;
96 final UpdatedFlow updated = in.getUpdatedFlow();
97 final OriginalFlow original = in.getOriginalFlow();
99 final List<FlowModInputBuilder> allFlowMods = new ArrayList<>();
100 List<FlowModInputBuilder> ofFlowModInputs;
102 if (!FlowCreatorUtil.canModifyFlow(original, updated, version)) {
103 // We would need to remove original and add updated.
106 final RemoveFlowInputBuilder removeflow = new RemoveFlowInputBuilder(original);
107 final List<FlowModInputBuilder> ofFlowRemoveInput = FlowConvertor.toFlowModInputs(removeflow.build(),
108 version, datapathId);
109 // remove flow should be the first
110 allFlowMods.addAll(ofFlowRemoveInput);
111 final AddFlowInputBuilder addFlowInputBuilder = new AddFlowInputBuilder(updated);
112 ofFlowModInputs = FlowConvertor.toFlowModInputs(addFlowInputBuilder.build(), version, datapathId);
114 ofFlowModInputs = FlowConvertor.toFlowModInputs(updated, version, datapathId);
117 allFlowMods.addAll(ofFlowModInputs);
118 return processFlowModInputBuilders(allFlowMods);
121 private <T> Future<RpcResult<T>> processFlowModInputBuilders(
122 final List<FlowModInputBuilder> ofFlowModInputs) {
123 final List<ListenableFuture<RpcResult<T>>> partialFutures = new ArrayList<>();
124 for (FlowModInputBuilder flowModInputBuilder : ofFlowModInputs) {
125 ListenableFuture<RpcResult<T>> partialFuture = handleServiceCall(PRIMARY_CONNECTION, flowModInputBuilder,
126 new Function<DataCrate<T>, ListenableFuture<RpcResult<Void>>>() {
128 public ListenableFuture<RpcResult<Void>> apply(final DataCrate<T> data) {
129 return createResultForFlowMod(data);
132 partialFutures.add(partialFuture);
135 ListenableFuture<List<RpcResult<T>>> allFutures = Futures.allAsList(partialFutures);
136 final SettableFuture<RpcResult<T>> finalFuture = SettableFuture.create();
137 Futures.addCallback(allFutures, new FutureCallback<List<RpcResult<T>>>() {
139 public void onSuccess(List<RpcResult<T>> result) {
140 for (RpcResult<T> rpcResult : result) {
141 if (rpcResult.isSuccessful()) {
142 // TODO: AddFlowOutput has getTransactionId() - shouldn't it have some value?
143 finalFuture.set(RpcResultBuilder.<T> success().build());
149 public void onFailure(Throwable t) {
150 finalFuture.set(RpcResultBuilder.<T> failed().withError(ErrorType.APPLICATION, "", t.getMessage())
158 protected <T> ListenableFuture<RpcResult<Void>> createResultForFlowMod(final DataCrate<T> data) {
159 return createResultForFlowMod(data, data.getFlowModInputBuilder()) ;
162 protected <T> ListenableFuture<RpcResult<Void>> createResultForFlowMod(final DataCrate<T> data, final FlowModInputBuilder flowModInput) {
163 final Xid xId = deviceContext.getNextXid();
164 flowModInput.setXid(xId.getValue());
165 data.getRequestContext().setXid(xId);
166 Future<RpcResult<Void>> flowModResult = provideConnectionAdapter(data.getiDConnection()).flowMod(
167 flowModInput.build());
168 return JdkFutureAdapters.listenInPoolThread(flowModResult);