add flow to device flow registry when processing addFlow
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / SalFlowServiceImpl.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.services;
9
10 import com.google.common.base.Function;
11 import com.google.common.util.concurrent.FutureCallback;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.JdkFutureAdapters;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.math.BigInteger;
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.concurrent.Future;
20 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
21 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
22 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
23 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
24 import org.opendaylight.openflowplugin.api.openflow.flow.registry.FlowHash;
25 import org.opendaylight.openflowplugin.impl.flow.registry.FlowHashFactory;
26 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor;
27 import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlow;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlow;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
43 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
44 import org.opendaylight.yangtools.yang.common.RpcResult;
45 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
46 import org.slf4j.Logger;
47
48 public class SalFlowServiceImpl extends CommonService implements SalFlowService {
49
50     private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(SalFlowServiceImpl.class);
51
52     public SalFlowServiceImpl(final RequestContextStack requestContextStack, final DeviceContext deviceContext) {
53         super(requestContextStack, deviceContext);
54     }
55
56     <T, F> ListenableFuture<RpcResult<T>> handleServiceCall(final BigInteger connectionID,
57                                                             final FlowModInputBuilder flowModInputBuilder, final Function<DataCrate<T>, ListenableFuture<RpcResult<F>>> function) {
58         LOG.debug("Calling the FlowMod RPC method on MessageDispatchService");
59
60         final RequestContext<T> requestContext = requestContextStack.createRequestContext();
61         final SettableFuture<RpcResult<T>> result = requestContextStack.storeOrFail(requestContext);
62         final DataCrate<T> dataCrate = DataCrateBuilder.<T>builder().setiDConnection(connectionID)
63                 .setRequestContext(requestContext).setFlowModInputBuilder(flowModInputBuilder).build();
64
65         if (!result.isDone()) {
66             final ListenableFuture<RpcResult<F>> resultFromOFLib = function.apply(dataCrate);
67
68             final OFJResult2RequestCtxFuture<T> OFJResult2RequestCtxFuture = new OFJResult2RequestCtxFuture<>(requestContext, deviceContext);
69             OFJResult2RequestCtxFuture.processResultFromOfJava(resultFromOFLib);
70
71         } else {
72             RequestContextUtil.closeRequstContext(requestContext);
73         }
74         return result;
75     }
76
77     @Override
78     public Future<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) {
79         final List<FlowModInputBuilder> ofFlowModInputs = FlowConvertor.toFlowModInputs(input, version, datapathId);
80         final ListenableFuture future = processFlowModInputBuilders(ofFlowModInputs);
81
82         if (!future.isCancelled()) {
83             FlowHash flowHash = FlowHashFactory.create(input);
84             FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class, FlowKey.class).getId();
85             deviceContext.getFlowRegistry().store(flowHash, flowId);
86         }
87
88         return future;
89     }
90
91     @Override
92     public Future<RpcResult<RemoveFlowOutput>> removeFlow(final RemoveFlowInput input) {
93
94         return this.<RemoveFlowOutput, Void>handleServiceCall(PRIMARY_CONNECTION,
95                 new Function<DataCrate<RemoveFlowOutput>, ListenableFuture<RpcResult<Void>>>() {
96                     @Override
97                     public ListenableFuture<RpcResult<Void>> apply(final DataCrate<RemoveFlowOutput> data) {
98                         final FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, version,
99                                 datapathId);
100                         ListenableFuture future = createResultForFlowMod(data, ofFlowModInput);
101                         Futures.addCallback(future, new FutureCallback() {
102                             @Override
103                             public void onSuccess(final Object o) {
104                                 FlowHash flowHash = FlowHashFactory.create(input);
105                                 deviceContext.getFlowRegistry().remove(flowHash);
106                             }
107
108                             @Override
109                             public void onFailure(final Throwable throwable) {
110                                 //NOOP
111                             }
112                         });
113                         return future;
114                     }
115                 });
116     }
117
118     @Override
119     public Future<RpcResult<UpdateFlowOutput>> updateFlow(final UpdateFlowInput input) {
120         final UpdateFlowInput in = input;
121         final UpdatedFlow updated = in.getUpdatedFlow();
122         final OriginalFlow original = in.getOriginalFlow();
123
124         final List<FlowModInputBuilder> allFlowMods = new ArrayList<>();
125         List<FlowModInputBuilder> ofFlowModInputs;
126
127         if (!FlowCreatorUtil.canModifyFlow(original, updated, version)) {
128             // We would need to remove original and add updated.
129
130             // remove flow
131             final RemoveFlowInputBuilder removeflow = new RemoveFlowInputBuilder(original);
132             final List<FlowModInputBuilder> ofFlowRemoveInput = FlowConvertor.toFlowModInputs(removeflow.build(),
133                     version, datapathId);
134             // remove flow should be the first
135             allFlowMods.addAll(ofFlowRemoveInput);
136             final AddFlowInputBuilder addFlowInputBuilder = new AddFlowInputBuilder(updated);
137             ofFlowModInputs = FlowConvertor.toFlowModInputs(addFlowInputBuilder.build(), version, datapathId);
138         } else {
139             ofFlowModInputs = FlowConvertor.toFlowModInputs(updated, version, datapathId);
140         }
141
142         allFlowMods.addAll(ofFlowModInputs);
143         ListenableFuture future = processFlowModInputBuilders(allFlowMods);
144         Futures.addCallback(future, new FutureCallback() {
145             @Override
146             public void onSuccess(final Object o) {
147                 FlowHash flowHash = FlowHashFactory.create(original);
148                 deviceContext.getFlowRegistry().remove(flowHash);
149
150                 flowHash = FlowHashFactory.create(updated);
151                 FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class, FlowKey.class).getId();
152                 deviceContext.getFlowRegistry().store(flowHash, flowId);
153
154             }
155
156             @Override
157             public void onFailure(final Throwable throwable) {
158
159             }
160         });
161         return future;
162     }
163
164     private <T> ListenableFuture<RpcResult<T>> processFlowModInputBuilders(
165             final List<FlowModInputBuilder> ofFlowModInputs) {
166         final List<ListenableFuture<RpcResult<T>>> partialFutures = new ArrayList<>();
167         for (FlowModInputBuilder flowModInputBuilder : ofFlowModInputs) {
168             ListenableFuture<RpcResult<T>> partialFuture = handleServiceCall(PRIMARY_CONNECTION, flowModInputBuilder,
169                     new Function<DataCrate<T>, ListenableFuture<RpcResult<Void>>>() {
170                         @Override
171                         public ListenableFuture<RpcResult<Void>> apply(final DataCrate<T> data) {
172                             return createResultForFlowMod(data);
173                         }
174                     });
175             partialFutures.add(partialFuture);
176         }
177
178         ListenableFuture<List<RpcResult<T>>> allFutures = Futures.allAsList(partialFutures);
179         final SettableFuture<RpcResult<T>> finalFuture = SettableFuture.create();
180         Futures.addCallback(allFutures, new FutureCallback<List<RpcResult<T>>>() {
181             @Override
182             public void onSuccess(List<RpcResult<T>> result) {
183                 for (RpcResult<T> rpcResult : result) {
184                     if (rpcResult.isSuccessful()) {
185                         // TODO: AddFlowOutput has getTransactionId() - shouldn't it have some value?
186                         finalFuture.set(RpcResultBuilder.<T>success().build());
187                     }
188                 }
189             }
190
191             @Override
192             public void onFailure(Throwable t) {
193                 finalFuture.set(RpcResultBuilder.<T>failed().withError(ErrorType.APPLICATION, "", t.getMessage())
194                         .build());
195             }
196         });
197
198         return finalFuture;
199     }
200
201     protected <T> ListenableFuture<RpcResult<Void>> createResultForFlowMod(final DataCrate<T> data) {
202         return createResultForFlowMod(data, data.getFlowModInputBuilder());
203     }
204
205     protected <T> ListenableFuture<RpcResult<Void>> createResultForFlowMod(final DataCrate<T> data, final FlowModInputBuilder flowModInput) {
206         final Xid xId = deviceContext.getNextXid();
207         flowModInput.setXid(xId.getValue());
208         data.getRequestContext().setXid(xId);
209         Future<RpcResult<Void>> flowModResult = provideConnectionAdapter(data.getiDConnection()).flowMod(
210                 flowModInput.build());
211         return JdkFutureAdapters.listenInPoolThread(flowModResult);
212     }
213
214 }