Merge changes from topic "mdsal-4.0.3"
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / FlowForwarder.java
1 /*
2  * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frm.impl;
9
10 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.buildGroupInstanceIdentifier;
11 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getActiveBundle;
12 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getFlowId;
13 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdFromNodeIdentifier;
14 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isFlowDependentOnGroup;
15 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isGroupExistsOnDevice;
16
17 import com.google.common.base.Preconditions;
18 import com.google.common.util.concurrent.FluentFuture;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.util.Optional;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.Future;
27 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
28 import org.opendaylight.mdsal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
30 import org.opendaylight.mdsal.binding.api.ReadTransaction;
31 import org.opendaylight.mdsal.binding.api.WriteTransaction;
32 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
33 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
34 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlowBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupRef;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
67 import org.opendaylight.yangtools.yang.common.RpcError;
68 import org.opendaylight.yangtools.yang.common.RpcResult;
69 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72
73 /**
74  * FlowForwarder It implements
75  * {@link org.opendaylight.mdsal.binding.api.DataTreeChangeListener}
76  * for WildCardedPath to {@link Flow} and ForwardingRulesCommiter interface for
77  * methods: add, update and remove {@link Flow} processing for
78  * {@link org.opendaylight.mdsal.binding.api.DataTreeModification}.
79  */
80 public class FlowForwarder extends AbstractListeningCommiter<Flow> {
81
82     private static final Logger LOG = LoggerFactory.getLogger(FlowForwarder.class);
83
84     private static final String GROUP_EXISTS_IN_DEVICE_ERROR = "GROUPEXISTS";
85
86     private ListenerRegistration<FlowForwarder> listenerRegistration;
87
88     public FlowForwarder(final ForwardingRulesManager manager, final DataBroker db) {
89         super(manager, db);
90     }
91
92     @Override
93     @SuppressWarnings("IllegalCatch")
94     public void registerListener() {
95         final DataTreeIdentifier<Flow> treeId = DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
96                 getWildCardPath());
97         try {
98             listenerRegistration = dataBroker.registerDataTreeChangeListener(treeId, FlowForwarder.this);
99         } catch (final Exception e) {
100             LOG.warn("FRM Flow DataTreeChange listener registration fail!");
101             LOG.debug("FRM Flow DataTreeChange listener registration fail ..", e);
102             throw new IllegalStateException("FlowForwarder startup fail! System needs restart.", e);
103         }
104     }
105
106
107     @Override
108     public  void deregisterListener() {
109         close();
110     }
111
112     @Override
113     public void close() {
114         if (listenerRegistration != null) {
115             listenerRegistration.close();
116             listenerRegistration = null;
117         }
118     }
119
120     @Override
121     public void remove(final InstanceIdentifier<Flow> identifier, final Flow removeDataObj,
122             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
123
124         final TableKey tableKey = identifier.firstKeyOf(Table.class);
125         if (tableIdValidationPrecondition(tableKey, removeDataObj)) {
126             BundleId bundleId = getActiveBundle(nodeIdent, provider);
127             if (bundleId != null) {
128                 provider.getBundleFlowListener().remove(identifier, removeDataObj, nodeIdent, bundleId);
129             } else {
130                 final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj);
131                 builder.setFlowRef(new FlowRef(identifier));
132                 builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
133                 builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
134
135                 // This method is called only when a given flow object has been
136                 // removed from datastore. So FRM always needs to set strict flag
137                 // into remove-flow input so that only a flow entry associated with
138                 // a given flow object is removed.
139                 builder.setTransactionUri(new Uri(provider.getNewTransactionId())).setStrict(Boolean.TRUE);
140                 LoggingFutures.addErrorLogging(provider.getSalFlowService().removeFlow(builder.build()), LOG,
141                     "removeFlow");
142             }
143         }
144     }
145
146     // TODO: Pull this into ForwardingRulesCommiter and override it here
147
148     @Override
149     public ListenableFuture<RpcResult<RemoveFlowOutput>> removeWithResult(final InstanceIdentifier<Flow> identifier,
150             final Flow removeDataObj, final InstanceIdentifier<FlowCapableNode> nodeIdent) {
151
152         ListenableFuture<RpcResult<RemoveFlowOutput>> resultFuture = SettableFuture.create();
153         final TableKey tableKey = identifier.firstKeyOf(Table.class);
154         if (tableIdValidationPrecondition(tableKey, removeDataObj)) {
155             final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj);
156             builder.setFlowRef(new FlowRef(identifier));
157             builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
158             builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
159
160             // This method is called only when a given flow object has been
161             // removed from datastore. So FRM always needs to set strict flag
162             // into remove-flow input so that only a flow entry associated with
163             // a given flow object is removed.
164             builder.setTransactionUri(new Uri(provider.getNewTransactionId())).setStrict(Boolean.TRUE);
165             resultFuture = provider.getSalFlowService().removeFlow(builder.build());
166         }
167
168         return resultFuture;
169     }
170
171     @Override
172     public void update(final InstanceIdentifier<Flow> identifier, final Flow original, final Flow update,
173             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
174
175         final TableKey tableKey = identifier.firstKeyOf(Table.class);
176         if (tableIdValidationPrecondition(tableKey, update)) {
177             BundleId bundleId = getActiveBundle(nodeIdent, provider);
178             if (bundleId != null) {
179                 provider.getBundleFlowListener().update(identifier, original, update, nodeIdent, bundleId);
180             } else {
181                 final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
182                 nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
183                     final UpdateFlowInputBuilder builder = new UpdateFlowInputBuilder();
184                     builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
185                     builder.setFlowRef(new FlowRef(identifier));
186                     builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
187
188                     // This method is called only when a given flow object in datastore
189                     // has been updated. So FRM always needs to set strict flag into
190                     // update-flow input so that only a flow entry associated with
191                     // a given flow object is updated.
192                     builder.setUpdatedFlow(new UpdatedFlowBuilder(update).setStrict(Boolean.TRUE).build());
193                     builder.setOriginalFlow(new OriginalFlowBuilder(original).setStrict(Boolean.TRUE).build());
194
195                     Long groupId = isFlowDependentOnGroup(update);
196                     if (groupId != null) {
197                         LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
198                                 getFlowId(new FlowRef(identifier)), groupId);
199                         if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) {
200                             LOG.trace("The dependent group {} is already programmed. Updating the flow {}", groupId,
201                                     getFlowId(new FlowRef(identifier)));
202                             return provider.getSalFlowService().updateFlow(builder.build());
203                         } else {
204                             LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
205                             ListenableFuture<RpcResult<AddGroupOutput>> groupFuture = pushDependentGroup(nodeIdent,
206                                     groupId);
207                             SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture = SettableFuture.create();
208                             Futures.addCallback(groupFuture,
209                                     new UpdateFlowCallBack(builder.build(), nodeId, resultFuture, groupId),
210                                     MoreExecutors.directExecutor());
211                             return resultFuture;
212                         }
213                     }
214
215                     LOG.trace("The flow {} is not dependent on any group. Updating the flow",
216                             getFlowId(new FlowRef(identifier)));
217                     return provider.getSalFlowService().updateFlow(builder.build());
218                 });
219             }
220         }
221     }
222
223     @Override
224     public Future<? extends RpcResult<?>> add(final InstanceIdentifier<Flow> identifier, final Flow addDataObj,
225             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
226
227         final TableKey tableKey = identifier.firstKeyOf(Table.class);
228         if (tableIdValidationPrecondition(tableKey, addDataObj)) {
229             BundleId bundleId = getActiveBundle(nodeIdent, provider);
230             if (bundleId != null) {
231                 return provider.getBundleFlowListener().add(identifier, addDataObj, nodeIdent, bundleId);
232             } else {
233                 final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
234                 nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
235                     final AddFlowInputBuilder builder = new AddFlowInputBuilder(addDataObj);
236
237                     builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
238                     builder.setFlowRef(new FlowRef(identifier));
239                     builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
240                     builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
241                     Long groupId = isFlowDependentOnGroup(addDataObj);
242                     if (groupId != null) {
243                         LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
244                                 getFlowId(new FlowRef(identifier)), groupId);
245                         if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) {
246                             LOG.trace("The dependent group {} is already programmed. Adding the flow {}", groupId,
247                                     getFlowId(new FlowRef(identifier)));
248                             return provider.getSalFlowService().addFlow(builder.build());
249                         } else {
250                             LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
251                             ListenableFuture<RpcResult<AddGroupOutput>> groupFuture = pushDependentGroup(nodeIdent,
252                                     groupId);
253                             SettableFuture<RpcResult<AddFlowOutput>> resultFuture = SettableFuture.create();
254                             Futures.addCallback(groupFuture, new AddFlowCallBack(builder.build(), nodeId, groupId,
255                                     resultFuture), MoreExecutors.directExecutor());
256                             return resultFuture;
257                         }
258                     }
259
260                     LOG.trace("The flow {} is not dependent on any group. Adding the flow",
261                             getFlowId(new FlowRef(identifier)));
262                     return provider.getSalFlowService().addFlow(builder.build());
263                 });
264             }
265         }
266         return Futures.immediateFuture(null);
267     }
268
269     @Override
270     public void createStaleMarkEntity(InstanceIdentifier<Flow> identifier, Flow del,
271             InstanceIdentifier<FlowCapableNode> nodeIdent) {
272         LOG.debug("Creating Stale-Mark entry for the switch {} for flow {} ", nodeIdent.toString(), del.toString());
273         StaleFlow staleFlow = makeStaleFlow(identifier, del, nodeIdent);
274         persistStaleFlow(staleFlow, nodeIdent);
275     }
276
277     @Override
278     protected InstanceIdentifier<Flow> getWildCardPath() {
279         return InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class)
280                 .child(Table.class).child(Flow.class);
281     }
282
283     private static boolean tableIdValidationPrecondition(final TableKey tableKey, final Flow flow) {
284         Preconditions.checkNotNull(tableKey, "TableKey can not be null or empty!");
285         Preconditions.checkNotNull(flow, "Flow can not be null or empty!");
286         if (!tableKey.getId().equals(flow.getTableId())) {
287             LOG.warn("TableID in URI tableId={} and in palyload tableId={} is not same.", flow.getTableId(),
288                     tableKey.getId());
289             return false;
290         }
291         return true;
292     }
293
294     private StaleFlow makeStaleFlow(InstanceIdentifier<Flow> identifier, Flow del,
295             InstanceIdentifier<FlowCapableNode> nodeIdent) {
296         StaleFlowBuilder staleFlowBuilder = new StaleFlowBuilder(del);
297         return staleFlowBuilder.setId(del.getId()).build();
298     }
299
300     private void persistStaleFlow(StaleFlow staleFlow, InstanceIdentifier<FlowCapableNode> nodeIdent) {
301         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
302         writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getStaleFlowInstanceIdentifier(staleFlow, nodeIdent),
303                 staleFlow, false);
304
305         FluentFuture<?> submitFuture = writeTransaction.commit();
306         handleStaleFlowResultFuture(submitFuture);
307     }
308
309     private void handleStaleFlowResultFuture(FluentFuture<?> submitFuture) {
310         submitFuture.addCallback(new FutureCallback<Object>() {
311             @Override
312             public void onSuccess(Object result) {
313                 LOG.debug("Stale Flow creation success");
314             }
315
316             @Override
317             public void onFailure(Throwable throwable) {
318                 LOG.error("Stale Flow creation failed", throwable);
319             }
320         }, MoreExecutors.directExecutor());
321
322     }
323
324     private InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight
325         .flow.inventory.rev130819.tables.table.StaleFlow> getStaleFlowInstanceIdentifier(
326             StaleFlow staleFlow, InstanceIdentifier<FlowCapableNode> nodeIdent) {
327         return nodeIdent.child(Table.class, new TableKey(staleFlow.getTableId())).child(
328                 org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow.class,
329                 new StaleFlowKey(new FlowId(staleFlow.getId())));
330     }
331
332     private ListenableFuture<RpcResult<AddGroupOutput>> pushDependentGroup(
333             final InstanceIdentifier<FlowCapableNode> nodeIdent, final Long groupId) {
334
335         //TODO This read to the DS might have a performance impact.
336         //if the dependent group is not installed than we should just cache the parent group,
337         //till we receive the dependent group DTCN and then push it.
338
339         InstanceIdentifier<Group> groupIdent = buildGroupInstanceIdentifier(nodeIdent, groupId);
340         ListenableFuture<RpcResult<AddGroupOutput>> resultFuture;
341         LOG.info("Reading the group from config inventory: {}", groupId);
342         try (ReadTransaction readTransaction = provider.getReadTransaction()) {
343             Optional<Group> group = readTransaction.read(LogicalDatastoreType.CONFIGURATION, groupIdent).get();
344             if (group.isPresent()) {
345                 final AddGroupInputBuilder builder = new AddGroupInputBuilder(group.get());
346                 builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
347                 builder.setGroupRef(new GroupRef(nodeIdent));
348                 builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
349                 AddGroupInput addGroupInput = builder.build();
350                 resultFuture = this.provider.getSalGroupService().addGroup(addGroupInput);
351             } else {
352                 resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddGroupOutput>failed()
353                         .withError(RpcError.ErrorType.APPLICATION,
354                                 "Group " + groupId + " not present in the config inventory").build());
355             }
356         } catch (InterruptedException | ExecutionException e) {
357             LOG.error("Error while reading group from config datastore for the group ID {}", groupId, e);
358             resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddGroupOutput>failed()
359                     .withError(RpcError.ErrorType.APPLICATION,
360                             "Error while reading group " + groupId + " from inventory").build());
361         }
362         return resultFuture;
363     }
364
365     private final class AddFlowCallBack implements FutureCallback<RpcResult<AddGroupOutput>> {
366         private final AddFlowInput addFlowInput;
367         private final NodeId nodeId;
368         private final Long groupId;
369         private final SettableFuture<RpcResult<AddFlowOutput>> resultFuture;
370
371         private AddFlowCallBack(final AddFlowInput addFlowInput, final NodeId nodeId, Long groupId,
372                 SettableFuture<RpcResult<AddFlowOutput>> resultFuture) {
373             this.addFlowInput = addFlowInput;
374             this.nodeId = nodeId;
375             this.groupId = groupId;
376             this.resultFuture = resultFuture;
377         }
378
379         @Override
380         public void onSuccess(RpcResult<AddGroupOutput> rpcResult) {
381             if (rpcResult.isSuccessful() || rpcResult.getErrors().size() == 1
382                     && rpcResult.getErrors().iterator().next().getMessage().contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
383                 provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
384                 Futures.addCallback(provider.getSalFlowService().addFlow(addFlowInput),
385                     new FutureCallback<RpcResult<AddFlowOutput>>() {
386                         @Override
387                         public void onSuccess(RpcResult<AddFlowOutput> result) {
388                             resultFuture.set(result);
389                         }
390
391                         @Override
392                         public void onFailure(Throwable failure) {
393                             resultFuture.setException(failure);
394                         }
395                     },  MoreExecutors.directExecutor());
396
397                 LOG.debug("Flow add with id {} finished without error for node {}",
398                         getFlowId(addFlowInput.getFlowRef()), nodeId);
399             } else {
400                 LOG.error("Flow add with id {} failed for node {} with error {}", getFlowId(addFlowInput.getFlowRef()),
401                         nodeId, rpcResult.getErrors().toString());
402                 resultFuture.set(RpcResultBuilder.<AddFlowOutput>failed()
403                         .withRpcErrors(rpcResult.getErrors()).build());
404             }
405         }
406
407         @Override
408         public void onFailure(Throwable throwable) {
409             LOG.error("Service call for adding flow with id {} failed for node {}",
410                     getFlowId(addFlowInput.getFlowRef()), nodeId, throwable);
411             resultFuture.setException(throwable);
412         }
413     }
414
415     private final class UpdateFlowCallBack implements FutureCallback<RpcResult<AddGroupOutput>> {
416         private final UpdateFlowInput updateFlowInput;
417         private final NodeId nodeId;
418         private final Long groupId;
419         private final SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture;
420
421         private UpdateFlowCallBack(final UpdateFlowInput updateFlowInput, final NodeId nodeId,
422                 SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture, Long groupId) {
423             this.updateFlowInput = updateFlowInput;
424             this.nodeId = nodeId;
425             this.groupId = groupId;
426             this.resultFuture = resultFuture;
427         }
428
429         @Override
430         public void onSuccess(RpcResult<AddGroupOutput> rpcResult) {
431             if (rpcResult.isSuccessful() || rpcResult.getErrors().size() == 1
432                     && rpcResult.getErrors().iterator().next().getMessage().contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
433                 provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
434                 Futures.addCallback(provider.getSalFlowService().updateFlow(updateFlowInput),
435                     new FutureCallback<RpcResult<UpdateFlowOutput>>() {
436                         @Override
437                         public void onSuccess(RpcResult<UpdateFlowOutput> result) {
438                             resultFuture.set(result);
439                         }
440
441                         @Override
442                         public void onFailure(Throwable failure) {
443                             resultFuture.setException(failure);
444                         }
445                     },  MoreExecutors.directExecutor());
446
447                 LOG.debug("Flow update with id {} finished without error for node {}",
448                         getFlowId(updateFlowInput.getFlowRef()), nodeId);
449             } else {
450                 LOG.error("Flow update with id {} failed for node {} with error {}",
451                         getFlowId(updateFlowInput.getFlowRef()), nodeId, rpcResult.getErrors().toString());
452                 resultFuture.set(RpcResultBuilder.<UpdateFlowOutput>failed()
453                         .withRpcErrors(rpcResult.getErrors()).build());
454             }
455         }
456
457         @Override
458         public void onFailure(Throwable throwable) {
459             LOG.error("Service call for updating flow with id {} failed for node {}",
460                     getFlowId(updateFlowInput.getFlowRef()), nodeId, throwable);
461             resultFuture.setException(throwable);
462         }
463     }
464 }