Bump MRI upstreams
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / sal / SalFlowServiceImpl.java
index a164063badf5625e21a98ffe2a8aac25d23a6c5a..6542234329a4d3fb7d08d9221c4d503b7c0e7a80 100644 (file)
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
@@ -7,37 +7,29 @@
  */
 package org.opendaylight.openflowplugin.impl.services.sal;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.Future;
-import javax.annotation.Nullable;
 import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupStatus;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
-import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleSource;
-import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
 import org.opendaylight.openflowplugin.impl.registry.flow.FlowDescriptorFactory;
 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
-import org.opendaylight.openflowplugin.impl.services.singlelayer.SingleLayerFlowService;
 import org.opendaylight.openflowplugin.impl.services.multilayer.MultiLayerFlowService;
+import org.opendaylight.openflowplugin.impl.services.singlelayer.SingleLayerFlowService;
 import org.opendaylight.openflowplugin.impl.util.ErrorUtil;
+import org.opendaylight.openflowplugin.impl.util.FlowCreatorUtil;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
-import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
@@ -49,18 +41,20 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.Upda
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.Uint8;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
+public class SalFlowServiceImpl implements SalFlowService {
     private static final Logger LOG = LoggerFactory.getLogger(SalFlowServiceImpl.class);
+    private static final Uint8 OFPTT_ALL = Uint8.MAX_VALUE;
+
     private final MultiLayerFlowService<UpdateFlowOutput> flowUpdate;
     private final MultiLayerFlowService<AddFlowOutput> flowAdd;
     private final MultiLayerFlowService<RemoveFlowOutput> flowRemove;
@@ -68,60 +62,65 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
     private final SingleLayerFlowService<UpdateFlowOutput> flowUpdateMessage;
     private final SingleLayerFlowService<RemoveFlowOutput> flowRemoveMessage;
     private final DeviceContext deviceContext;
-    private ItemLifecycleListener itemLifecycleListener;
 
-    public SalFlowServiceImpl(final RequestContextStack requestContextStack, final DeviceContext deviceContext, final ConvertorExecutor convertorExecutor) {
+    public SalFlowServiceImpl(final RequestContextStack requestContextStack,
+                              final DeviceContext deviceContext,
+                              final ConvertorExecutor convertorExecutor) {
         this.deviceContext = deviceContext;
-        flowRemove = new MultiLayerFlowService<>(requestContextStack, deviceContext, RemoveFlowOutput.class, convertorExecutor);
-        flowAdd = new MultiLayerFlowService<>(requestContextStack, deviceContext, AddFlowOutput.class, convertorExecutor);
-        flowUpdate = new MultiLayerFlowService<>(requestContextStack, deviceContext, UpdateFlowOutput.class, convertorExecutor);
+        flowRemove = new MultiLayerFlowService<>(requestContextStack,
+                                                 deviceContext,
+                                                 RemoveFlowOutput.class,
+                                                 convertorExecutor);
+        flowAdd = new MultiLayerFlowService<>(requestContextStack,
+                                              deviceContext,
+                                              AddFlowOutput.class,
+                                              convertorExecutor);
+        flowUpdate = new MultiLayerFlowService<>(requestContextStack,
+                                                 deviceContext,
+                                                 UpdateFlowOutput.class,
+                                                 convertorExecutor);
         flowAddMessage = new SingleLayerFlowService<>(requestContextStack, deviceContext, AddFlowOutput.class);
         flowUpdateMessage = new SingleLayerFlowService<>(requestContextStack, deviceContext, UpdateFlowOutput.class);
-        flowRemoveMessage= new SingleLayerFlowService<>(requestContextStack, deviceContext, RemoveFlowOutput.class);
+        flowRemoveMessage = new SingleLayerFlowService<>(requestContextStack, deviceContext, RemoveFlowOutput.class);
     }
 
     @Override
-    public void setItemLifecycleListener(@Nullable ItemLifecycleListener itemLifecycleListener) {
-        this.itemLifecycleListener = itemLifecycleListener;
-    }
-
-    @Override
-    public Future<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) {
-        final FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
+    public ListenableFuture<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) {
+        final FlowRegistryKey flowRegistryKey =
+                FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
         final ListenableFuture<RpcResult<AddFlowOutput>> future;
-
         if (flowAddMessage.canUseSingleLayerSerialization()) {
             future = flowAddMessage.handleServiceCall(input);
-            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey));
+            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey),
+                    MoreExecutors.directExecutor());
         } else {
             future = flowAdd.processFlowModInputBuilders(flowAdd.toFlowModInputs(input));
-            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey));
+            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey),
+                    MoreExecutors.directExecutor());
 
         }
         return future;
     }
 
     @Override
-    public Future<RpcResult<RemoveFlowOutput>> removeFlow(final RemoveFlowInput input) {
+    public ListenableFuture<RpcResult<RemoveFlowOutput>> removeFlow(final RemoveFlowInput input) {
         final ListenableFuture<RpcResult<RemoveFlowOutput>> future;
-
         if (flowRemoveMessage.canUseSingleLayerSerialization()) {
             future = flowRemoveMessage.handleServiceCall(input);
-            Futures.addCallback(future, new RemoveFlowCallback(input));
+            Futures.addCallback(future, new RemoveFlowCallback(input), MoreExecutors.directExecutor());
 
         } else {
             future = flowRemove.processFlowModInputBuilders(flowRemove.toFlowModInputs(input));
-            Futures.addCallback(future, new RemoveFlowCallback(input));
+            Futures.addCallback(future, new RemoveFlowCallback(input), MoreExecutors.directExecutor());
         }
 
         return future;
     }
 
     @Override
-    public Future<RpcResult<UpdateFlowOutput>> updateFlow(final UpdateFlowInput input) {
+    public ListenableFuture<RpcResult<UpdateFlowOutput>> updateFlow(final UpdateFlowInput input) {
         final UpdatedFlow updated = input.getUpdatedFlow();
         final OriginalFlow original = input.getOriginalFlow();
-
         final List<FlowModInputBuilder> allFlowMods = new ArrayList<>();
         final List<FlowModInputBuilder> ofFlowModInputs;
 
@@ -131,18 +130,18 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
             if (!FlowCreatorUtil.canModifyFlow(original, updated, flowUpdateMessage.getVersion())) {
                 final SettableFuture<RpcResult<UpdateFlowOutput>> objectSettableFuture = SettableFuture.create();
 
-                final ListenableFuture<List<RpcResult<UpdateFlowOutput>>> listListenableFuture = Futures.successfulAsList(
-                        flowUpdateMessage.handleServiceCall(input.getOriginalFlow()),
-                        flowUpdateMessage.handleServiceCall(input.getUpdatedFlow()));
+                final ListenableFuture<List<RpcResult<UpdateFlowOutput>>> listListenableFuture =
+                        Futures.successfulAsList(flowUpdateMessage.handleServiceCall(input.getOriginalFlow()),
+                                                 flowUpdateMessage.handleServiceCall(input.getUpdatedFlow()));
 
                 Futures.addCallback(listListenableFuture, new FutureCallback<List<RpcResult<UpdateFlowOutput>>>() {
                     @Override
                     public void onSuccess(final List<RpcResult<UpdateFlowOutput>> results) {
-                        final ArrayList<RpcError> errors = new ArrayList();
+                        final ArrayList<RpcError> errors = new ArrayList<>();
                         for (RpcResult<UpdateFlowOutput> flowModResult : results) {
                             if (flowModResult == null) {
                                 errors.add(RpcResultBuilder.newError(
-                                        RpcError.ErrorType.PROTOCOL, OFConstants.APPLICATION_TAG,
+                                        ErrorType.PROTOCOL, OFConstants.APPLICATION_TAG,
                                         "unexpected flowMod result (null) occurred"));
                             } else if (!flowModResult.isSuccessful()) {
                                 errors.addAll(flowModResult.getErrors());
@@ -160,11 +159,11 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
                     }
 
                     @Override
-                    public void onFailure(final Throwable t) {
+                    public void onFailure(final Throwable throwable) {
                         RpcResultBuilder<UpdateFlowOutput> rpcResultBuilder = RpcResultBuilder.failed();
                         objectSettableFuture.set(rpcResultBuilder.build());
                     }
-                });
+                }, MoreExecutors.directExecutor());
 
                 future = objectSettableFuture;
             } else {
@@ -190,67 +189,58 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
             future = flowUpdate.processFlowModInputBuilders(allFlowMods);
         }
 
-        Futures.addCallback(future, new UpdateFlowCallback(input));
+        Futures.addCallback(future, new UpdateFlowCallback(input), MoreExecutors.directExecutor());
         return future;
     }
 
-    @VisibleForTesting
-    private static KeyedInstanceIdentifier<Flow, FlowKey> createFlowPath(FlowDescriptor flowDescriptor,
-                                                                 KeyedInstanceIdentifier<Node, NodeKey> nodePath) {
-        return nodePath.augmentation(FlowCapableNode.class)
-                .child(Table.class, flowDescriptor.getTableKey())
-                .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
-    }
-
-    private class AddFlowCallback implements FutureCallback<RpcResult<AddFlowOutput>> {
+    private final class AddFlowCallback implements FutureCallback<RpcResult<AddFlowOutput>> {
         private final AddFlowInput input;
         private final FlowRegistryKey flowRegistryKey;
 
-        private AddFlowCallback(final AddFlowInput input,
-                                final FlowRegistryKey flowRegistryKey) {
+        private AddFlowCallback(final AddFlowInput input, final FlowRegistryKey flowRegistryKey) {
             this.input = input;
             this.flowRegistryKey = flowRegistryKey;
         }
 
         @Override
         public void onSuccess(final RpcResult<AddFlowOutput> rpcResult) {
-            if (rpcResult.isSuccessful()) {
-                final FlowDescriptor flowDescriptor;
-
-                if (Objects.nonNull(input.getFlowRef())) {
-                    final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class, FlowKey.class).getId();
-                    flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
-                    deviceContext.getDeviceFlowRegistry().store(flowRegistryKey, flowDescriptor);
-                } else {
-                    final FlowId flowId = deviceContext.getDeviceFlowRegistry().storeIfNecessary(flowRegistryKey);
-                    flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
-                }
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Flow add with id={} finished without error", flowDescriptor.getFlowId().getValue());
-                }
-
-                if (itemLifecycleListener != null) {
-                    KeyedInstanceIdentifier<Flow, FlowKey> flowPath = createFlowPath(flowDescriptor,
-                            deviceContext.getDeviceInfo().getNodeInstanceIdentifier());
-                    final FlowBuilder flowBuilder = new FlowBuilder(input).setId(flowDescriptor.getFlowId());
-                    itemLifecycleListener.onAdded(flowPath, flowBuilder.build());
-                }
-            } else {
+            if (!rpcResult.isSuccessful()) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Flow add failed for flow={}, errors={}", input,
                             ErrorUtil.errorsToString(rpcResult.getErrors()));
                 }
+                return;
+            }
+
+            final DeviceFlowRegistry flowRegistry = deviceContext.getDeviceFlowRegistry();
+            final FlowDescriptor flowDescriptor;
+            final FlowRef flowRef = input.getFlowRef();
+            if (flowRef != null) {
+                final Uint8 tableId = input.getTableId();
+                final FlowId flowId = flowRef.getValue().firstKeyOf(Flow.class).getId();
+                flowDescriptor = FlowDescriptorFactory.create(tableId, flowId);
+
+                // FIXME: this looks like an atomic operation
+                flowRegistry.appendHistoryFlow(flowId, tableId, FlowGroupStatus.ADDED);
+                flowRegistry.storeDescriptor(flowRegistryKey, flowDescriptor);
+            } else {
+                // FIXME: this looks like an atomic operation
+                flowRegistry.store(flowRegistryKey);
+                flowDescriptor = flowRegistry.retrieveDescriptor(flowRegistryKey);
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Flow add with id={} finished without error", flowDescriptor.getFlowId().getValue());
             }
         }
 
         @Override
         public void onFailure(final Throwable throwable) {
-            LOG.warn("Service call for adding flow={} failed, reason: {}", input, throwable);
+            LOG.warn("Service call for adding flow={} failed", input, throwable);
         }
     }
 
-    private class RemoveFlowCallback implements FutureCallback<RpcResult<RemoveFlowOutput>> {
+    private final class RemoveFlowCallback implements FutureCallback<RpcResult<RemoveFlowOutput>> {
         private final RemoveFlowInput input;
 
         private RemoveFlowCallback(final RemoveFlowInput input) {
@@ -263,17 +253,19 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Flow remove finished without error for flow={}", input);
                 }
-                FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
-                deviceContext.getDeviceFlowRegistry().removeDescriptor(flowRegistryKey);
-
-                if (itemLifecycleListener != null) {
-                    final FlowDescriptor flowDescriptor =
-                            deviceContext.getDeviceFlowRegistry().retrieveIdForFlow(flowRegistryKey);
-                    if (flowDescriptor != null) {
-                        KeyedInstanceIdentifier<Flow, FlowKey> flowPath = createFlowPath(flowDescriptor,
-                                deviceContext.getDeviceInfo().getNodeInstanceIdentifier());
-                        itemLifecycleListener.onRemoved(flowPath);
+                final DeviceFlowRegistry flowRegistry = deviceContext.getDeviceFlowRegistry();
+                if (input.getTableId() != null && !input.getTableId().equals(OFPTT_ALL)) {
+                    FlowRegistryKey flowRegistryKey =
+                            FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
+                    flowRegistry.addMark(flowRegistryKey);
+
+                    final FlowRef flowRef = input.getFlowRef();
+                    if (flowRef != null) {
+                        final FlowId flowId = flowRef.getValue().firstKeyOf(Flow.class).getId();
+                        flowRegistry.appendHistoryFlow(flowId, input.getTableId(), FlowGroupStatus.REMOVED);
                     }
+                } else {
+                    flowRegistry.clearFlowRegistry();
                 }
             } else {
                 if (LOG.isDebugEnabled()) {
@@ -285,58 +277,54 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
 
         @Override
         public void onFailure(final Throwable throwable) {
-            LOG.warn("Service call for removing flow={} failed, reason: {}", input, throwable);
+            LOG.warn("Service call for removing flow={} failed", input, throwable);
         }
     }
 
-    private class UpdateFlowCallback implements FutureCallback<RpcResult<UpdateFlowOutput>> {
+    private final class UpdateFlowCallback implements FutureCallback<RpcResult<UpdateFlowOutput>> {
         private final UpdateFlowInput input;
 
-        private UpdateFlowCallback(UpdateFlowInput input) {
+        private UpdateFlowCallback(final UpdateFlowInput input) {
             this.input = input;
         }
 
         @Override
-        public void onSuccess(final RpcResult<UpdateFlowOutput> o) {
+        public void onSuccess(final RpcResult<UpdateFlowOutput> updateFlowOutputRpcResult) {
             final DeviceFlowRegistry deviceFlowRegistry = deviceContext.getDeviceFlowRegistry();
-
             final UpdatedFlow updated = input.getUpdatedFlow();
             final OriginalFlow original = input.getOriginalFlow();
-            final FlowRegistryKey origFlowRegistryKey = FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), original);
-            final FlowRegistryKey updatedFlowRegistryKey = FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), updated);
-            final FlowDescriptor origFlowDescriptor = deviceFlowRegistry.retrieveIdForFlow(origFlowRegistryKey);
-
-            final boolean isUpdate = Objects.nonNull(origFlowDescriptor);
-            final FlowId fLowId = Objects.nonNull(input.getFlowRef())
-                    ? input.getFlowRef().getValue().firstKeyOf(Flow.class).getId()
-                    : isUpdate ? origFlowDescriptor.getFlowId() : deviceFlowRegistry.storeIfNecessary(updatedFlowRegistryKey);
-            final FlowDescriptor updatedFlowDescriptor = FlowDescriptorFactory.create(updated.getTableId(), fLowId);
-            if (isUpdate) {
-                deviceFlowRegistry.removeDescriptor(origFlowRegistryKey);
-                deviceFlowRegistry.store(updatedFlowRegistryKey, updatedFlowDescriptor);
+            final FlowRegistryKey origFlowRegistryKey =
+                    FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), original);
+            final FlowRegistryKey updatedFlowRegistryKey =
+                    FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), updated);
+            final FlowDescriptor origFlowDescriptor = deviceFlowRegistry.retrieveDescriptor(origFlowRegistryKey);
+
+            final boolean isUpdate = origFlowDescriptor != null;
+            final FlowDescriptor updatedFlowDescriptor;
+            final FlowRef flowRef = input.getFlowRef();
+            if (flowRef != null) {
+                final Uint8 tableId = updated.getTableId();
+                final FlowId flowId = flowRef.getValue().firstKeyOf(Flow.class).getId();
+                // FIXME: this does not look right, we probably want better integration
+                deviceFlowRegistry.appendHistoryFlow(flowId, tableId, FlowGroupStatus.MODIFIED);
+
+                updatedFlowDescriptor = FlowDescriptorFactory.create(tableId, flowId);
+            } else if (isUpdate) {
+                updatedFlowDescriptor = origFlowDescriptor;
+            } else {
+                deviceFlowRegistry.store(updatedFlowRegistryKey);
+                updatedFlowDescriptor = deviceFlowRegistry.retrieveDescriptor(updatedFlowRegistryKey);
             }
 
-            if (itemLifecycleListener != null) {
-                final KeyedInstanceIdentifier<Flow, FlowKey> flowPath =
-                        createFlowPath(
-                                updatedFlowDescriptor,
-                                deviceContext.getDeviceInfo().getNodeInstanceIdentifier());
-
-                final Flow flow = new FlowBuilder(updated)
-                        .setId(updatedFlowDescriptor.getFlowId())
-                        .build();
-
-                if (Objects.nonNull(origFlowDescriptor)) {
-                    itemLifecycleListener.onUpdated(flowPath, flow);
-                } else {
-                    itemLifecycleListener.onAdded(flowPath, flow);
-                }
+            if (isUpdate) {
+                deviceFlowRegistry.addMark(origFlowRegistryKey);
+                deviceFlowRegistry.storeDescriptor(updatedFlowRegistryKey, updatedFlowDescriptor);
             }
         }
 
         @Override
         public void onFailure(final Throwable throwable) {
-            LOG.warn("Service call for updating flow={} failed, reason: {}", input, throwable);
+            LOG.warn("Service call for updating flow={} failed", input, throwable);
         }
     }
 }