Merge "PortTranslator modification"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / SalFlowServiceImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.services;
9
10 import com.google.common.base.Function;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.JdkFutureAdapters;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.math.BigInteger;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.Future;
22 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
23 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
24 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
25 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
26 import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
27 import org.opendaylight.openflowplugin.api.openflow.flow.registry.FlowDescriptor;
28 import org.opendaylight.openflowplugin.api.openflow.flow.registry.FlowHash;
29 import org.opendaylight.openflowplugin.impl.flow.registry.FlowDescriptorFactory;
30 import org.opendaylight.openflowplugin.impl.flow.registry.FlowHashFactory;
31 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor;
32 import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlow;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlow;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
48 import org.opendaylight.yangtools.yang.common.RpcError;
49 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
50 import org.opendaylight.yangtools.yang.common.RpcResult;
51 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
52 import org.slf4j.Logger;
53
54 public class SalFlowServiceImpl extends CommonService implements SalFlowService {
55
56     private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(SalFlowServiceImpl.class);
57
58     public SalFlowServiceImpl(final RequestContextStack requestContextStack, final DeviceContext deviceContext) {
59         super(requestContextStack, deviceContext);
60     }
61
62     <T, F> ListenableFuture<RpcResult<T>> handleServiceCall(final BigInteger connectionID,
63                                                             final FlowModInputBuilder flowModInputBuilder, final Function<DataCrate<T>, ListenableFuture<RpcResult<F>>> function) {
64         LOG.debug("Calling the FlowMod RPC method on MessageDispatchService");
65
66         final RequestContext<T> requestContext = requestContextStack.createRequestContext();
67         final SettableFuture<RpcResult<T>> result = requestContextStack.storeOrFail(requestContext);
68         final DataCrate<T> dataCrate = DataCrateBuilder.<T>builder().setiDConnection(connectionID)
69                 .setRequestContext(requestContext).setFlowModInputBuilder(flowModInputBuilder).build();
70
71         if (!result.isDone()) {
72             final ListenableFuture<RpcResult<F>> resultFromOFLib = function.apply(dataCrate);
73
74             final OFJResult2RequestCtxFuture<T> OFJResult2RequestCtxFuture = new OFJResult2RequestCtxFuture<>(requestContext, deviceContext);
75             OFJResult2RequestCtxFuture.processResultFromOfJava(resultFromOFLib);
76
77         } else {
78             RequestContextUtil.closeRequstContext(requestContext);
79         }
80         return result;
81     }
82
83     @Override
84     public Future<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) {
85         final List<FlowModInputBuilder> ofFlowModInputs = FlowConvertor.toFlowModInputs(input, version, datapathId);
86         final ListenableFuture future = processFlowModInputBuilders(ofFlowModInputs);
87         final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class, FlowKey.class).getId();
88
89         Futures.addCallback(future, new FutureCallback() {
90             @Override
91             public void onSuccess(final Object o) {
92                 FlowHash flowHash = FlowHashFactory.create(input);
93                 FlowDescriptor flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
94                 deviceContext.getDeviceFlowRegistry().store(flowHash, flowDescriptor);
95                 LOG.debug("flow add finished without error, id={}", flowId.getValue());
96             }
97
98             @Override
99             public void onFailure(final Throwable throwable) {
100                 LOG.trace("Service call for adding flows failed, id={}.", flowId.getValue(), throwable);
101             }
102         });
103
104         return future;
105     }
106
107     @Override
108     public Future<RpcResult<RemoveFlowOutput>> removeFlow(final RemoveFlowInput input) {
109
110         return this.<RemoveFlowOutput, Void>handleServiceCall(PRIMARY_CONNECTION,
111                 new Function<DataCrate<RemoveFlowOutput>, ListenableFuture<RpcResult<Void>>>() {
112                     @Override
113                     public ListenableFuture<RpcResult<Void>> apply(final DataCrate<RemoveFlowOutput> data) {
114                         final FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, version,
115                                 datapathId);
116                         final ListenableFuture<RpcResult<Void>> future = createResultForFlowMod(data, ofFlowModInput);
117                         Futures.addCallback(future, new FutureCallback() {
118                             @Override
119                             public void onSuccess(final Object o) {
120                                 FlowHash flowHash = FlowHashFactory.create(input);
121                                 deviceContext.getDeviceFlowRegistry().markToBeremoved(flowHash);
122                             }
123
124                             @Override
125                             public void onFailure(final Throwable throwable) {
126                                 StringBuffer errors = new StringBuffer();
127                                 try {
128                                     RpcResult<Void> result = future.get();
129                                     Collection<RpcError> rpcErrors = result.getErrors();
130                                     if (null != rpcErrors && rpcErrors.size() > 0) {
131                                         for (RpcError rpcError : rpcErrors) {
132                                             errors.append(rpcError.getMessage());
133                                         }
134                                     }
135                                 } catch (InterruptedException | ExecutionException e) {
136                                     LOG.trace("Flow modification failed. Can't read errors from RpcResult.");
137                                 }
138                                 LOG.trace("Flow modification failed. Errors : {}", errors.toString());
139                             }
140                         });
141                         return future;
142                     }
143                 });
144     }
145
146     @Override
147     public Future<RpcResult<UpdateFlowOutput>> updateFlow(final UpdateFlowInput input) {
148         final UpdateFlowInput in = input;
149         final UpdatedFlow updated = in.getUpdatedFlow();
150         final OriginalFlow original = in.getOriginalFlow();
151
152         final List<FlowModInputBuilder> allFlowMods = new ArrayList<>();
153         List<FlowModInputBuilder> ofFlowModInputs;
154
155         if (!FlowCreatorUtil.canModifyFlow(original, updated, version)) {
156             // We would need to remove original and add updated.
157
158             // remove flow
159             final RemoveFlowInputBuilder removeflow = new RemoveFlowInputBuilder(original);
160             final List<FlowModInputBuilder> ofFlowRemoveInput = FlowConvertor.toFlowModInputs(removeflow.build(),
161                     version, datapathId);
162             // remove flow should be the first
163             allFlowMods.addAll(ofFlowRemoveInput);
164             final AddFlowInputBuilder addFlowInputBuilder = new AddFlowInputBuilder(updated);
165             ofFlowModInputs = FlowConvertor.toFlowModInputs(addFlowInputBuilder.build(), version, datapathId);
166         } else {
167             ofFlowModInputs = FlowConvertor.toFlowModInputs(updated, version, datapathId);
168         }
169
170         allFlowMods.addAll(ofFlowModInputs);
171         ListenableFuture future = processFlowModInputBuilders(allFlowMods);
172         Futures.addCallback(future, new FutureCallback() {
173             @Override
174             public void onSuccess(final Object o) {
175                 FlowHash flowHash = FlowHashFactory.create(original);
176                 deviceContext.getDeviceFlowRegistry().markToBeremoved(flowHash);
177
178                 flowHash = FlowHashFactory.create(updated);
179                 FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class, FlowKey.class).getId();
180                 FlowDescriptor flowDescriptor = FlowDescriptorFactory.create(updated.getTableId(), flowId);
181                 deviceContext.getDeviceFlowRegistry().store(flowHash, flowDescriptor);
182
183             }
184
185             @Override
186             public void onFailure(final Throwable throwable) {
187
188             }
189         });
190         return future;
191     }
192
193     private <T> ListenableFuture<RpcResult<T>> processFlowModInputBuilders(
194             final List<FlowModInputBuilder> ofFlowModInputs) {
195         final List<ListenableFuture<RpcResult<T>>> partialFutures = new ArrayList<>();
196         for (FlowModInputBuilder flowModInputBuilder : ofFlowModInputs) {
197             ListenableFuture<RpcResult<T>> partialFuture = handleServiceCall(PRIMARY_CONNECTION, flowModInputBuilder,
198                     new Function<DataCrate<T>, ListenableFuture<RpcResult<Void>>>() {
199                         @Override
200                         public ListenableFuture<RpcResult<Void>> apply(final DataCrate<T> data) {
201                             return createResultForFlowMod(data);
202                         }
203                     });
204             partialFutures.add(partialFuture);
205         }
206
207         final ListenableFuture<List<RpcResult<T>>> allFutures = Futures.allAsList(partialFutures);
208         final SettableFuture<RpcResult<T>> finalFuture = SettableFuture.create();
209         Futures.addCallback(allFutures, new FutureCallback<List<RpcResult<T>>>() {
210             @Override
211             public void onSuccess(List<RpcResult<T>> result) {
212                 LOG.warn("Positive confirmation of flow push is not supported by OF-spec");
213                 for (FlowModInputBuilder ofFlowModInput : ofFlowModInputs) {
214                     LOG.warn("flow future result was successful [{}] = this should have never happen",
215                             ofFlowModInput.getXid());
216                 }
217                 finalFuture.setException(new DeviceDataException("positive confirmation of flow occurred"));
218             }
219
220             @Override
221             public void onFailure(Throwable t) {
222                 LOG.trace("Flow mods chained future failed.");
223                 RpcResultBuilder<T> resultBuilder;
224                 if (allFutures.isCancelled()) {
225                     if (LOG.isTraceEnabled()) {
226                         for (FlowModInputBuilder ofFlowModInput : ofFlowModInputs) {
227                             LOG.trace("flow future result was cancelled [{}] = barrier passed it without error",
228                                     ofFlowModInput.getXid());
229                         }
230                     }
231                     resultBuilder = RpcResultBuilder.<T>success();
232                 } else {
233                     resultBuilder = RpcResultBuilder.<T>failed().withError(ErrorType.APPLICATION, "", t.getMessage());
234                 }
235                 finalFuture.set(resultBuilder.build());
236             }
237         });
238
239         return finalFuture;
240     }
241
242     protected <T> ListenableFuture<RpcResult<Void>> createResultForFlowMod(final DataCrate<T> data) {
243         return createResultForFlowMod(data, data.getFlowModInputBuilder());
244     }
245
246     protected <T> ListenableFuture<RpcResult<Void>> createResultForFlowMod(final DataCrate<T> data, final FlowModInputBuilder flowModInput) {
247         final Xid xId = deviceContext.getNextXid();
248         flowModInput.setXid(xId.getValue());
249         data.getRequestContext().setXid(xId);
250         Future<RpcResult<Void>> flowModResult = provideConnectionAdapter(data.getiDConnection()).flowMod(
251                 flowModInput.build());
252         return JdkFutureAdapters.listenInPoolThread(flowModResult);
253     }
254
255 }