Do not use JdkFutureAdapters
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / FlowForwarder.java
1 /*
2  * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.frm.impl;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.buildGroupInstanceIdentifier;
12 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getActiveBundle;
13 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getFlowId;
14 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdValueFromNodeIdentifier;
15 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isFlowDependentOnGroup;
16 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isGroupExistsOnDevice;
17
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import com.google.common.util.concurrent.SettableFuture;
23 import java.util.Optional;
24 import java.util.concurrent.ExecutionException;
25 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
26 import org.opendaylight.mdsal.binding.api.DataBroker;
27 import org.opendaylight.mdsal.binding.api.ReadTransaction;
28 import org.opendaylight.mdsal.binding.api.WriteTransaction;
29 import org.opendaylight.mdsal.common.api.CommitInfo;
30 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
31 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
32 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlowBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupRef;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
62 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
63 import org.opendaylight.yangtools.yang.common.ErrorType;
64 import org.opendaylight.yangtools.yang.common.RpcResult;
65 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
66 import org.opendaylight.yangtools.yang.common.Uint32;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 /**
71  * FlowForwarder It implements
72  * {@link org.opendaylight.mdsal.binding.api.DataTreeChangeListener}
73  * for WildCardedPath to {@link Flow} and ForwardingRulesCommiter interface for
74  * methods: add, update and remove {@link Flow} processing for
75  * {@link org.opendaylight.mdsal.binding.api.DataTreeModification}.
76  */
77 public class FlowForwarder extends AbstractListeningCommiter<Flow> {
78     private static final Logger LOG = LoggerFactory.getLogger(FlowForwarder.class);
79     private static final String GROUP_EXISTS_IN_DEVICE_ERROR = "GROUPEXISTS";
80
81     public FlowForwarder(final ForwardingRulesManager manager, final DataBroker db,
82                          final ListenerRegistrationHelper registrationHelper) {
83         super(manager, db, registrationHelper);
84     }
85
86     @Override
87     public void remove(final InstanceIdentifier<Flow> identifier, final Flow removeDataObj,
88             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
89
90         final TableKey tableKey = identifier.firstKeyOf(Table.class);
91         if (tableIdValidationPrecondition(tableKey, removeDataObj)) {
92             BundleId bundleId = getActiveBundle(nodeIdent, provider);
93             if (bundleId != null) {
94                 provider.getBundleFlowListener().remove(identifier, removeDataObj, nodeIdent, bundleId);
95             } else {
96                 final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
97                 nodeConfigurator.enqueueJob(nodeId, () -> {
98                     final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj);
99                     builder.setFlowRef(new FlowRef(identifier));
100                     builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
101                     builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
102
103                     // This method is called only when a given flow object has been
104                     // removed from datastore. So FRM always needs to set strict flag
105                     // into remove-flow input so that only a flow entry associated with
106                     // a given flow object is removed.
107                     builder.setTransactionUri(new Uri(provider.getNewTransactionId())).setStrict(Boolean.TRUE);
108                     final ListenableFuture<RpcResult<RemoveFlowOutput>> resultFuture =
109                             provider.getSalFlowService().removeFlow(builder.build());
110                     LoggingFutures.addErrorLogging(resultFuture, LOG, "removeFlow");
111                     return resultFuture;
112                 });
113             }
114         }
115     }
116
117     // TODO: Pull this into ForwardingRulesCommiter and override it here
118
119     @Override
120     public ListenableFuture<RpcResult<RemoveFlowOutput>> removeWithResult(final InstanceIdentifier<Flow> identifier,
121             final Flow removeDataObj, final InstanceIdentifier<FlowCapableNode> nodeIdent) {
122
123         ListenableFuture<RpcResult<RemoveFlowOutput>> resultFuture = SettableFuture.create();
124         final TableKey tableKey = identifier.firstKeyOf(Table.class);
125         if (tableIdValidationPrecondition(tableKey, removeDataObj)) {
126             final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj);
127             builder.setFlowRef(new FlowRef(identifier));
128             builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
129             builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
130
131             // This method is called only when a given flow object has been
132             // removed from datastore. So FRM always needs to set strict flag
133             // into remove-flow input so that only a flow entry associated with
134             // a given flow object is removed.
135             builder.setTransactionUri(new Uri(provider.getNewTransactionId())).setStrict(Boolean.TRUE);
136             resultFuture = provider.getSalFlowService().removeFlow(builder.build());
137         }
138
139         return resultFuture;
140     }
141
142     @Override
143     public void update(final InstanceIdentifier<Flow> identifier, final Flow original, final Flow update,
144             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
145
146         final TableKey tableKey = identifier.firstKeyOf(Table.class);
147         if (tableIdValidationPrecondition(tableKey, update)) {
148             BundleId bundleId = getActiveBundle(nodeIdent, provider);
149             if (bundleId != null) {
150                 provider.getBundleFlowListener().update(identifier, original, update, nodeIdent, bundleId);
151             } else {
152                 final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
153                 nodeConfigurator.enqueueJob(nodeId, () -> {
154                     final UpdateFlowInputBuilder builder = new UpdateFlowInputBuilder();
155                     builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
156                     builder.setFlowRef(new FlowRef(identifier));
157                     builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
158
159                     // This method is called only when a given flow object in datastore
160                     // has been updated. So FRM always needs to set strict flag into
161                     // update-flow input so that only a flow entry associated with
162                     // a given flow object is updated.
163                     builder.setUpdatedFlow(new UpdatedFlowBuilder(update).setStrict(Boolean.TRUE).build());
164                     builder.setOriginalFlow(new OriginalFlowBuilder(original).setStrict(Boolean.TRUE).build());
165
166                     Uint32 groupId = isFlowDependentOnGroup(update);
167                     if (groupId != null) {
168                         LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
169                                 getFlowId(identifier), groupId);
170                         if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) {
171                             LOG.trace("The dependent group {} is already programmed. Updating the flow {}", groupId,
172                                     getFlowId(identifier));
173                             return provider.getSalFlowService().updateFlow(builder.build());
174                         } else {
175                             LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
176                             ListenableFuture<RpcResult<AddGroupOutput>> groupFuture = pushDependentGroup(nodeIdent,
177                                     groupId);
178                             SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture = SettableFuture.create();
179                             Futures.addCallback(groupFuture,
180                                     new UpdateFlowCallBack(builder.build(), nodeId, resultFuture, groupId),
181                                     MoreExecutors.directExecutor());
182                             return resultFuture;
183                         }
184                     }
185
186                     LOG.trace("The flow {} is not dependent on any group. Updating the flow",
187                             getFlowId(identifier));
188                     return provider.getSalFlowService().updateFlow(builder.build());
189                 });
190             }
191         }
192     }
193
194     @Override
195     public ListenableFuture<? extends RpcResult<?>> add(final InstanceIdentifier<Flow> identifier,
196             final Flow addDataObj, final InstanceIdentifier<FlowCapableNode> nodeIdent) {
197         final var tableKey = identifier.firstKeyOf(Table.class);
198         if (!tableIdValidationPrecondition(tableKey, addDataObj)) {
199             return Futures.immediateFuture(null);
200         }
201         final var bundleId = getActiveBundle(nodeIdent, provider);
202         if (bundleId != null) {
203             return provider.getBundleFlowListener().add(identifier, addDataObj, nodeIdent, bundleId);
204         }
205
206         final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
207         return nodeConfigurator.enqueueJob(nodeId, () -> {
208             final var builder = new AddFlowInputBuilder(addDataObj)
209                 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
210                 .setFlowRef(new FlowRef(identifier))
211                 .setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)))
212                 .setTransactionUri(new Uri(provider.getNewTransactionId()));
213             final var groupId = isFlowDependentOnGroup(addDataObj);
214             if (groupId != null) {
215                 LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
216                     getFlowId(new FlowRef(identifier)), groupId);
217                 if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) {
218                     LOG.trace("The dependent group {} is already programmed. Adding the flow {}", groupId,
219                         getFlowId(new FlowRef(identifier)));
220                     return provider.getSalFlowService().addFlow(builder.build());
221                 }
222
223                 LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
224                 final var groupFuture = pushDependentGroup(nodeIdent, groupId);
225                 final var resultFuture = SettableFuture.<RpcResult<AddFlowOutput>>create();
226                 Futures.addCallback(groupFuture, new AddFlowCallBack(builder.build(), nodeId, groupId,
227                     resultFuture), MoreExecutors.directExecutor());
228                 return resultFuture;
229             }
230
231             LOG.trace("The flow {} is not dependent on any group. Adding the flow",
232                 getFlowId(new FlowRef(identifier)));
233             return provider.getSalFlowService().addFlow(builder.build());
234         });
235     }
236
237     @Override
238     public void createStaleMarkEntity(final InstanceIdentifier<Flow> identifier, final Flow del,
239             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
240         LOG.debug("Creating Stale-Mark entry for the switch {} for flow {} ", nodeIdent, del);
241         StaleFlow staleFlow = makeStaleFlow(identifier, del, nodeIdent);
242         persistStaleFlow(staleFlow, nodeIdent);
243     }
244
245     @Override
246     protected InstanceIdentifier<Flow> getWildCardPath() {
247         return InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class)
248                 .child(Table.class).child(Flow.class);
249     }
250
251     private static boolean tableIdValidationPrecondition(final TableKey tableKey, final Flow flow) {
252         requireNonNull(tableKey, "TableKey can not be null or empty!");
253         requireNonNull(flow, "Flow can not be null or empty!");
254         if (!tableKey.getId().equals(flow.getTableId())) {
255             LOG.warn("TableID in URI tableId={} and in palyload tableId={} is not same.", flow.getTableId(),
256                     tableKey.getId());
257             return false;
258         }
259         return true;
260     }
261
262     private static StaleFlow makeStaleFlow(final InstanceIdentifier<Flow> identifier, final Flow del,
263             final InstanceIdentifier<FlowCapableNode> nodeIdent) {
264         StaleFlowBuilder staleFlowBuilder = new StaleFlowBuilder(del);
265         return staleFlowBuilder.setId(del.getId()).build();
266     }
267
268     private void persistStaleFlow(final StaleFlow staleFlow, final InstanceIdentifier<FlowCapableNode> nodeIdent) {
269         WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
270         writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getStaleFlowInstanceIdentifier(staleFlow, nodeIdent),
271                 staleFlow);
272
273         writeTransaction.commit().addCallback(new FutureCallback<CommitInfo>() {
274             @Override
275             public void onSuccess(final CommitInfo result) {
276                 LOG.debug("Stale Flow creation success");
277             }
278
279             @Override
280             public void onFailure(final Throwable throwable) {
281                 LOG.error("Stale Flow creation failed", throwable);
282             }
283         }, MoreExecutors.directExecutor());
284     }
285
286     private static InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight
287         .flow.inventory.rev130819.tables.table.StaleFlow> getStaleFlowInstanceIdentifier(
288             final StaleFlow staleFlow, final InstanceIdentifier<FlowCapableNode> nodeIdent) {
289         return nodeIdent.child(Table.class, new TableKey(staleFlow.getTableId())).child(
290                 org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow.class,
291                 new StaleFlowKey(new FlowId(staleFlow.getId())));
292     }
293
294     private ListenableFuture<RpcResult<AddGroupOutput>> pushDependentGroup(
295             final InstanceIdentifier<FlowCapableNode> nodeIdent, final Uint32 groupId) {
296
297         //TODO This read to the DS might have a performance impact.
298         //if the dependent group is not installed than we should just cache the parent group,
299         //till we receive the dependent group DTCN and then push it.
300
301         InstanceIdentifier<Group> groupIdent = buildGroupInstanceIdentifier(nodeIdent, groupId);
302         ListenableFuture<RpcResult<AddGroupOutput>> resultFuture;
303         LOG.info("Reading the group from config inventory: {}", groupId);
304         try (ReadTransaction readTransaction = provider.getReadTransaction()) {
305             Optional<Group> group = readTransaction.read(LogicalDatastoreType.CONFIGURATION, groupIdent).get();
306             if (group.isPresent()) {
307                 final AddGroupInputBuilder builder = new AddGroupInputBuilder(group.orElseThrow());
308                 builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
309                 builder.setGroupRef(new GroupRef(nodeIdent));
310                 builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
311                 AddGroupInput addGroupInput = builder.build();
312                 resultFuture = provider.getSalGroupService().addGroup(addGroupInput);
313             } else {
314                 resultFuture = RpcResultBuilder.<AddGroupOutput>failed()
315                         .withError(ErrorType.APPLICATION,
316                                 "Group " + groupId + " not present in the config inventory").buildFuture();
317             }
318         } catch (InterruptedException | ExecutionException e) {
319             LOG.error("Error while reading group from config datastore for the group ID {}", groupId, e);
320             resultFuture = RpcResultBuilder.<AddGroupOutput>failed()
321                     .withError(ErrorType.APPLICATION,
322                             "Error while reading group " + groupId + " from inventory").buildFuture();
323         }
324         return resultFuture;
325     }
326
327     private final class AddFlowCallBack implements FutureCallback<RpcResult<AddGroupOutput>> {
328         private final AddFlowInput addFlowInput;
329         private final String nodeId;
330         private final Uint32 groupId;
331         private final SettableFuture<RpcResult<AddFlowOutput>> resultFuture;
332
333         private AddFlowCallBack(final AddFlowInput addFlowInput, final String nodeId, final Uint32 groupId,
334                 final SettableFuture<RpcResult<AddFlowOutput>> resultFuture) {
335             this.addFlowInput = addFlowInput;
336             this.nodeId = nodeId;
337             this.groupId = groupId;
338             this.resultFuture = resultFuture;
339         }
340
341         @Override
342         public void onSuccess(final RpcResult<AddGroupOutput> rpcResult) {
343             if (rpcResult.isSuccessful() || rpcResult.getErrors().size() == 1
344                     && rpcResult.getErrors().iterator().next().getMessage().contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
345                 provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
346                 Futures.addCallback(provider.getSalFlowService().addFlow(addFlowInput),
347                     new FutureCallback<RpcResult<AddFlowOutput>>() {
348                         @Override
349                         public void onSuccess(final RpcResult<AddFlowOutput> result) {
350                             resultFuture.set(result);
351                         }
352
353                         @Override
354                         public void onFailure(final Throwable failure) {
355                             resultFuture.setException(failure);
356                         }
357                     },  MoreExecutors.directExecutor());
358
359                 LOG.debug("Flow add with id {} finished without error for node {}",
360                         getFlowId(addFlowInput.getFlowRef()), nodeId);
361             } else {
362                 LOG.error("Flow add with id {} failed for node {} with error {}", getFlowId(addFlowInput.getFlowRef()),
363                         nodeId, rpcResult.getErrors());
364                 resultFuture.set(RpcResultBuilder.<AddFlowOutput>failed()
365                         .withRpcErrors(rpcResult.getErrors()).build());
366             }
367         }
368
369         @Override
370         public void onFailure(final Throwable throwable) {
371             LOG.error("Service call for adding flow with id {} failed for node {}",
372                     getFlowId(addFlowInput.getFlowRef()), nodeId, throwable);
373             resultFuture.setException(throwable);
374         }
375     }
376
377     private final class UpdateFlowCallBack implements FutureCallback<RpcResult<AddGroupOutput>> {
378         private final UpdateFlowInput updateFlowInput;
379         private final String nodeId;
380         private final Uint32 groupId;
381         private final SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture;
382
383         private UpdateFlowCallBack(final UpdateFlowInput updateFlowInput, final String nodeId,
384                 final SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture, final Uint32 groupId) {
385             this.updateFlowInput = updateFlowInput;
386             this.nodeId = nodeId;
387             this.groupId = groupId;
388             this.resultFuture = resultFuture;
389         }
390
391         @Override
392         public void onSuccess(final RpcResult<AddGroupOutput> rpcResult) {
393             if (rpcResult.isSuccessful() || rpcResult.getErrors().size() == 1
394                     && rpcResult.getErrors().iterator().next().getMessage().contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
395                 provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
396                 Futures.addCallback(provider.getSalFlowService().updateFlow(updateFlowInput),
397                     new FutureCallback<RpcResult<UpdateFlowOutput>>() {
398                         @Override
399                         public void onSuccess(final RpcResult<UpdateFlowOutput> result) {
400                             resultFuture.set(result);
401                         }
402
403                         @Override
404                         public void onFailure(final Throwable failure) {
405                             resultFuture.setException(failure);
406                         }
407                     },  MoreExecutors.directExecutor());
408
409                 LOG.debug("Flow update with id {} finished without error for node {}",
410                         getFlowId(updateFlowInput.getFlowRef()), nodeId);
411             } else {
412                 LOG.error("Flow update with id {} failed for node {} with error {}",
413                         getFlowId(updateFlowInput.getFlowRef()), nodeId, rpcResult.getErrors());
414                 resultFuture.set(RpcResultBuilder.<UpdateFlowOutput>failed()
415                         .withRpcErrors(rpcResult.getErrors()).build());
416             }
417         }
418
419         @Override
420         public void onFailure(final Throwable throwable) {
421             LOG.error("Service call for updating flow with id {} failed for node {}",
422                     getFlowId(updateFlowInput.getFlowRef()), nodeId, throwable);
423             resultFuture.setException(throwable);
424         }
425     }
426 }