b8cc16736ac3d0692846e4db5735e8cba16b4aca
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / FlowForwarder.java
1 /*
2  * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frm.impl;
9
10 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.buildGroupInstanceIdentifier;
11 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getActiveBundle;
12 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getFlowId;
13 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdValueFromNodeIdentifier;
14 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isFlowDependentOnGroup;
15 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isGroupExistsOnDevice;
16
17 import com.google.common.base.Preconditions;
18 import com.google.common.util.concurrent.FluentFuture;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.util.Optional;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.Future;
27 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
28 import org.opendaylight.mdsal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
30 import org.opendaylight.mdsal.binding.api.ReadTransaction;
31 import org.opendaylight.mdsal.binding.api.WriteTransaction;
32 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
33 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlowBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupRef;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
64 import org.opendaylight.yangtools.concepts.ListenerRegistration;
65 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
66 import org.opendaylight.yangtools.yang.common.RpcError;
67 import org.opendaylight.yangtools.yang.common.RpcResult;
68 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
69 import org.opendaylight.yangtools.yang.common.Uint32;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72
73 /**
74  * FlowForwarder It implements
75  * {@link org.opendaylight.mdsal.binding.api.DataTreeChangeListener}
76  * for WildCardedPath to {@link Flow} and ForwardingRulesCommiter interface for
77  * methods: add, update and remove {@link Flow} processing for
78  * {@link org.opendaylight.mdsal.binding.api.DataTreeModification}.
79  */
80 public class FlowForwarder extends AbstractListeningCommiter<Flow> {
81
82     private static final Logger LOG = LoggerFactory.getLogger(FlowForwarder.class);
83
84     private static final String GROUP_EXISTS_IN_DEVICE_ERROR = "GROUPEXISTS";
85
86     private ListenerRegistration<FlowForwarder> listenerRegistration;
87
88     public FlowForwarder(final ForwardingRulesManager manager, final DataBroker db) {
89         super(manager, db);
90     }
91
92     @Override
93     @SuppressWarnings("IllegalCatch")
94     public void registerListener() {
95         final DataTreeIdentifier<Flow> treeId = DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
96                 getWildCardPath());
97         try {
98             listenerRegistration = dataBroker.registerDataTreeChangeListener(treeId, FlowForwarder.this);
99         } catch (final Exception e) {
100             LOG.warn("FRM Flow DataTreeChange listener registration fail!");
101             LOG.debug("FRM Flow DataTreeChange listener registration fail ..", e);
102             throw new IllegalStateException("FlowForwarder startup fail! System needs restart.", e);
103         }
104     }
105
106
107     @Override
108     public  void deregisterListener() {
109         close();
110     }
111
112     @Override
113     public void close() {
114         if (listenerRegistration != null) {
115             listenerRegistration.close();
116             listenerRegistration = null;
117         }
118     }
119
120     @Override
121     public void remove(final InstanceIdentifier<Flow> identifier, final Flow removeDataObj,
122             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
123
124         final TableKey tableKey = identifier.firstKeyOf(Table.class);
125         if (tableIdValidationPrecondition(tableKey, removeDataObj)) {
126             BundleId bundleId = getActiveBundle(nodeIdent, provider);
127             if (bundleId != null) {
128                 provider.getBundleFlowListener().remove(identifier, removeDataObj, nodeIdent, bundleId);
129             } else {
130                 final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
131                 nodeConfigurator.enqueueJob(nodeId, () -> {
132                     final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj);
133                     builder.setFlowRef(new FlowRef(identifier));
134                     builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
135                     builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
136
137                     // This method is called only when a given flow object has been
138                     // removed from datastore. So FRM always needs to set strict flag
139                     // into remove-flow input so that only a flow entry associated with
140                     // a given flow object is removed.
141                     builder.setTransactionUri(new Uri(provider.getNewTransactionId())).setStrict(Boolean.TRUE);
142                     final ListenableFuture<RpcResult<RemoveFlowOutput>> resultFuture =
143                             provider.getSalFlowService().removeFlow(builder.build());
144                     LoggingFutures.addErrorLogging(resultFuture, LOG, "removeFlow");
145                     return resultFuture;
146                 });
147             }
148         }
149     }
150
151     // TODO: Pull this into ForwardingRulesCommiter and override it here
152
153     @Override
154     public ListenableFuture<RpcResult<RemoveFlowOutput>> removeWithResult(final InstanceIdentifier<Flow> identifier,
155             final Flow removeDataObj, final InstanceIdentifier<FlowCapableNode> nodeIdent) {
156
157         ListenableFuture<RpcResult<RemoveFlowOutput>> resultFuture = SettableFuture.create();
158         final TableKey tableKey = identifier.firstKeyOf(Table.class);
159         if (tableIdValidationPrecondition(tableKey, removeDataObj)) {
160             final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj);
161             builder.setFlowRef(new FlowRef(identifier));
162             builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
163             builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
164
165             // This method is called only when a given flow object has been
166             // removed from datastore. So FRM always needs to set strict flag
167             // into remove-flow input so that only a flow entry associated with
168             // a given flow object is removed.
169             builder.setTransactionUri(new Uri(provider.getNewTransactionId())).setStrict(Boolean.TRUE);
170             resultFuture = provider.getSalFlowService().removeFlow(builder.build());
171         }
172
173         return resultFuture;
174     }
175
176     @Override
177     public void update(final InstanceIdentifier<Flow> identifier, final Flow original, final Flow update,
178             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
179
180         final TableKey tableKey = identifier.firstKeyOf(Table.class);
181         if (tableIdValidationPrecondition(tableKey, update)) {
182             BundleId bundleId = getActiveBundle(nodeIdent, provider);
183             if (bundleId != null) {
184                 provider.getBundleFlowListener().update(identifier, original, update, nodeIdent, bundleId);
185             } else {
186                 final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
187                 nodeConfigurator.enqueueJob(nodeId, () -> {
188                     final UpdateFlowInputBuilder builder = new UpdateFlowInputBuilder();
189                     builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
190                     builder.setFlowRef(new FlowRef(identifier));
191                     builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
192
193                     // This method is called only when a given flow object in datastore
194                     // has been updated. So FRM always needs to set strict flag into
195                     // update-flow input so that only a flow entry associated with
196                     // a given flow object is updated.
197                     builder.setUpdatedFlow(new UpdatedFlowBuilder(update).setStrict(Boolean.TRUE).build());
198                     builder.setOriginalFlow(new OriginalFlowBuilder(original).setStrict(Boolean.TRUE).build());
199
200                     Uint32 groupId = isFlowDependentOnGroup(update);
201                     if (groupId != null) {
202                         LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
203                                 getFlowId(identifier), groupId);
204                         if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) {
205                             LOG.trace("The dependent group {} is already programmed. Updating the flow {}", groupId,
206                                     getFlowId(identifier));
207                             return provider.getSalFlowService().updateFlow(builder.build());
208                         } else {
209                             LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
210                             ListenableFuture<RpcResult<AddGroupOutput>> groupFuture = pushDependentGroup(nodeIdent,
211                                     groupId);
212                             SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture = SettableFuture.create();
213                             Futures.addCallback(groupFuture,
214                                     new UpdateFlowCallBack(builder.build(), nodeId, resultFuture, groupId),
215                                     MoreExecutors.directExecutor());
216                             return resultFuture;
217                         }
218                     }
219
220                     LOG.trace("The flow {} is not dependent on any group. Updating the flow",
221                             getFlowId(identifier));
222                     return provider.getSalFlowService().updateFlow(builder.build());
223                 });
224             }
225         }
226     }
227
228     @Override
229     public Future<? extends RpcResult<?>> add(final InstanceIdentifier<Flow> identifier, final Flow addDataObj,
230             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
231
232         final TableKey tableKey = identifier.firstKeyOf(Table.class);
233         if (tableIdValidationPrecondition(tableKey, addDataObj)) {
234             BundleId bundleId = getActiveBundle(nodeIdent, provider);
235             if (bundleId != null) {
236                 return provider.getBundleFlowListener().add(identifier, addDataObj, nodeIdent, bundleId);
237             } else {
238                 final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
239                 nodeConfigurator.enqueueJob(nodeId, () -> {
240                     final AddFlowInputBuilder builder = new AddFlowInputBuilder(addDataObj);
241
242                     builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
243                     builder.setFlowRef(new FlowRef(identifier));
244                     builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
245                     builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
246                     Uint32 groupId = isFlowDependentOnGroup(addDataObj);
247                     if (groupId != null) {
248                         LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
249                                 getFlowId(new FlowRef(identifier)), groupId);
250                         if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) {
251                             LOG.trace("The dependent group {} is already programmed. Adding the flow {}", groupId,
252                                     getFlowId(new FlowRef(identifier)));
253                             return provider.getSalFlowService().addFlow(builder.build());
254                         } else {
255                             LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
256                             ListenableFuture<RpcResult<AddGroupOutput>> groupFuture = pushDependentGroup(nodeIdent,
257                                     groupId);
258                             SettableFuture<RpcResult<AddFlowOutput>> resultFuture = SettableFuture.create();
259                             Futures.addCallback(groupFuture, new AddFlowCallBack(builder.build(), nodeId, groupId,
260                                     resultFuture), MoreExecutors.directExecutor());
261                             return resultFuture;
262                         }
263                     }
264
265                     LOG.trace("The flow {} is not dependent on any group. Adding the flow",
266                             getFlowId(new FlowRef(identifier)));
267                     return provider.getSalFlowService().addFlow(builder.build());
268                 });
269             }
270         }
271         return Futures.immediateFuture(null);
272     }
273
274     @Override
275     public void createStaleMarkEntity(InstanceIdentifier<Flow> identifier, Flow del,
276             InstanceIdentifier<FlowCapableNode> nodeIdent) {
277         LOG.debug("Creating Stale-Mark entry for the switch {} for flow {} ", nodeIdent, del);
278         StaleFlow staleFlow = makeStaleFlow(identifier, del, nodeIdent);
279         persistStaleFlow(staleFlow, nodeIdent);
280     }
281
282     @Override
283     protected InstanceIdentifier<Flow> getWildCardPath() {
284         return InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class)
285                 .child(Table.class).child(Flow.class);
286     }
287
288     private static boolean tableIdValidationPrecondition(final TableKey tableKey, final Flow flow) {
289         Preconditions.checkNotNull(tableKey, "TableKey can not be null or empty!");
290         Preconditions.checkNotNull(flow, "Flow can not be null or empty!");
291         if (!tableKey.getId().equals(flow.getTableId())) {
292             LOG.warn("TableID in URI tableId={} and in palyload tableId={} is not same.", flow.getTableId(),
293                     tableKey.getId());
294             return false;
295         }
296         return true;
297     }
298
299     private StaleFlow makeStaleFlow(InstanceIdentifier<Flow> identifier, Flow del,
300             InstanceIdentifier<FlowCapableNode> nodeIdent) {
301         StaleFlowBuilder staleFlowBuilder = new StaleFlowBuilder(del);
302         return staleFlowBuilder.setId(del.getId()).build();
303     }
304
305     private void persistStaleFlow(StaleFlow staleFlow, InstanceIdentifier<FlowCapableNode> nodeIdent) {
306         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
307         writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getStaleFlowInstanceIdentifier(staleFlow, nodeIdent),
308                 staleFlow, false);
309
310         FluentFuture<?> submitFuture = writeTransaction.commit();
311         handleStaleFlowResultFuture(submitFuture);
312     }
313
314     private void handleStaleFlowResultFuture(FluentFuture<?> submitFuture) {
315         submitFuture.addCallback(new FutureCallback<Object>() {
316             @Override
317             public void onSuccess(Object result) {
318                 LOG.debug("Stale Flow creation success");
319             }
320
321             @Override
322             public void onFailure(Throwable throwable) {
323                 LOG.error("Stale Flow creation failed", throwable);
324             }
325         }, MoreExecutors.directExecutor());
326
327     }
328
329     private InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight
330         .flow.inventory.rev130819.tables.table.StaleFlow> getStaleFlowInstanceIdentifier(
331             StaleFlow staleFlow, InstanceIdentifier<FlowCapableNode> nodeIdent) {
332         return nodeIdent.child(Table.class, new TableKey(staleFlow.getTableId())).child(
333                 org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow.class,
334                 new StaleFlowKey(new FlowId(staleFlow.getId())));
335     }
336
337     private ListenableFuture<RpcResult<AddGroupOutput>> pushDependentGroup(
338             final InstanceIdentifier<FlowCapableNode> nodeIdent, final Uint32 groupId) {
339
340         //TODO This read to the DS might have a performance impact.
341         //if the dependent group is not installed than we should just cache the parent group,
342         //till we receive the dependent group DTCN and then push it.
343
344         InstanceIdentifier<Group> groupIdent = buildGroupInstanceIdentifier(nodeIdent, groupId);
345         ListenableFuture<RpcResult<AddGroupOutput>> resultFuture;
346         LOG.info("Reading the group from config inventory: {}", groupId);
347         try (ReadTransaction readTransaction = provider.getReadTransaction()) {
348             Optional<Group> group = readTransaction.read(LogicalDatastoreType.CONFIGURATION, groupIdent).get();
349             if (group.isPresent()) {
350                 final AddGroupInputBuilder builder = new AddGroupInputBuilder(group.get());
351                 builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
352                 builder.setGroupRef(new GroupRef(nodeIdent));
353                 builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
354                 AddGroupInput addGroupInput = builder.build();
355                 resultFuture = this.provider.getSalGroupService().addGroup(addGroupInput);
356             } else {
357                 resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddGroupOutput>failed()
358                         .withError(RpcError.ErrorType.APPLICATION,
359                                 "Group " + groupId + " not present in the config inventory").build());
360             }
361         } catch (InterruptedException | ExecutionException e) {
362             LOG.error("Error while reading group from config datastore for the group ID {}", groupId, e);
363             resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddGroupOutput>failed()
364                     .withError(RpcError.ErrorType.APPLICATION,
365                             "Error while reading group " + groupId + " from inventory").build());
366         }
367         return resultFuture;
368     }
369
370     private final class AddFlowCallBack implements FutureCallback<RpcResult<AddGroupOutput>> {
371         private final AddFlowInput addFlowInput;
372         private final String nodeId;
373         private final Uint32 groupId;
374         private final SettableFuture<RpcResult<AddFlowOutput>> resultFuture;
375
376         private AddFlowCallBack(final AddFlowInput addFlowInput, final String nodeId, Uint32 groupId,
377                 SettableFuture<RpcResult<AddFlowOutput>> resultFuture) {
378             this.addFlowInput = addFlowInput;
379             this.nodeId = nodeId;
380             this.groupId = groupId;
381             this.resultFuture = resultFuture;
382         }
383
384         @Override
385         public void onSuccess(RpcResult<AddGroupOutput> rpcResult) {
386             if (rpcResult.isSuccessful() || rpcResult.getErrors().size() == 1
387                     && rpcResult.getErrors().iterator().next().getMessage().contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
388                 provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
389                 Futures.addCallback(provider.getSalFlowService().addFlow(addFlowInput),
390                     new FutureCallback<RpcResult<AddFlowOutput>>() {
391                         @Override
392                         public void onSuccess(RpcResult<AddFlowOutput> result) {
393                             resultFuture.set(result);
394                         }
395
396                         @Override
397                         public void onFailure(Throwable failure) {
398                             resultFuture.setException(failure);
399                         }
400                     },  MoreExecutors.directExecutor());
401
402                 LOG.debug("Flow add with id {} finished without error for node {}",
403                         getFlowId(addFlowInput.getFlowRef()), nodeId);
404             } else {
405                 LOG.error("Flow add with id {} failed for node {} with error {}", getFlowId(addFlowInput.getFlowRef()),
406                         nodeId, rpcResult.getErrors());
407                 resultFuture.set(RpcResultBuilder.<AddFlowOutput>failed()
408                         .withRpcErrors(rpcResult.getErrors()).build());
409             }
410         }
411
412         @Override
413         public void onFailure(Throwable throwable) {
414             LOG.error("Service call for adding flow with id {} failed for node {}",
415                     getFlowId(addFlowInput.getFlowRef()), nodeId, throwable);
416             resultFuture.setException(throwable);
417         }
418     }
419
420     private final class UpdateFlowCallBack implements FutureCallback<RpcResult<AddGroupOutput>> {
421         private final UpdateFlowInput updateFlowInput;
422         private final String nodeId;
423         private final Uint32 groupId;
424         private final SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture;
425
426         private UpdateFlowCallBack(final UpdateFlowInput updateFlowInput, final String nodeId,
427                 SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture, Uint32 groupId) {
428             this.updateFlowInput = updateFlowInput;
429             this.nodeId = nodeId;
430             this.groupId = groupId;
431             this.resultFuture = resultFuture;
432         }
433
434         @Override
435         public void onSuccess(RpcResult<AddGroupOutput> rpcResult) {
436             if (rpcResult.isSuccessful() || rpcResult.getErrors().size() == 1
437                     && rpcResult.getErrors().iterator().next().getMessage().contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
438                 provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
439                 Futures.addCallback(provider.getSalFlowService().updateFlow(updateFlowInput),
440                     new FutureCallback<RpcResult<UpdateFlowOutput>>() {
441                         @Override
442                         public void onSuccess(RpcResult<UpdateFlowOutput> result) {
443                             resultFuture.set(result);
444                         }
445
446                         @Override
447                         public void onFailure(Throwable failure) {
448                             resultFuture.setException(failure);
449                         }
450                     },  MoreExecutors.directExecutor());
451
452                 LOG.debug("Flow update with id {} finished without error for node {}",
453                         getFlowId(updateFlowInput.getFlowRef()), nodeId);
454             } else {
455                 LOG.error("Flow update with id {} failed for node {} with error {}",
456                         getFlowId(updateFlowInput.getFlowRef()), nodeId, rpcResult.getErrors());
457                 resultFuture.set(RpcResultBuilder.<UpdateFlowOutput>failed()
458                         .withRpcErrors(rpcResult.getErrors()).build());
459             }
460         }
461
462         @Override
463         public void onFailure(Throwable throwable) {
464             LOG.error("Service call for updating flow with id {} failed for node {}",
465                     getFlowId(updateFlowInput.getFlowRef()), nodeId, throwable);
466             resultFuture.setException(throwable);
467         }
468     }
469 }