SONAR TD - Sal*ServiceImpl 80/43980/1
authorTomas Slusny <tomas.slusny@pantheon.sk>
Wed, 3 Aug 2016 11:51:46 +0000 (13:51 +0200)
committerJozef Bacigal <jozef.bacigal@pantheon.tech>
Mon, 15 Aug 2016 13:31:27 +0000 (13:31 +0000)
- Changes some anonymous classes to named inner classes
- Shortened big lambda in SalFlatBatchService

Change-Id: I04d9f73ba92928144b2de1d2a86b8c4feac0ad22
Signed-off-by: Tomas Slusny <tomas.slusny@pantheon.sk>
(cherry picked from commit 5c2b5676715352cc935daa146c660e6b79c7bd03)

openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlatBatchServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java

index 05a88b15333626c1d6d09881ef460f43884a70f9..0967ffa0d18cc628a9c8a36d91ba650eed1e0a0a 100644 (file)
@@ -120,58 +120,7 @@ public class SalFlatBatchServiceImpl implements SalFlatBatchService {
                 LOG.trace("batch progressing on step type {}", planStep.getStepType());
                 LOG.trace("batch progressing previous steps result: {}", chainInput.isSuccessful());
 
-                final ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainOutput;
-                switch (planStep.getStepType()) {
-                    case FLOW_ADD:
-                        final AddFlowsBatchInput addFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node);
-                        final Future<RpcResult<AddFlowsBatchOutput>> resultAddFlowFuture = salFlowService.addFlowsBatch(addFlowsBatchInput);
-                        chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultAddFlowFuture, currentOffset);
-                        break;
-                    case FLOW_REMOVE:
-                        final RemoveFlowsBatchInput removeFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node);
-                        final Future<RpcResult<RemoveFlowsBatchOutput>> resultRemoveFlowFuture = salFlowService.removeFlowsBatch(removeFlowsBatchInput);
-                        chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultRemoveFlowFuture, currentOffset);
-                        break;
-                    case FLOW_UPDATE:
-                        final UpdateFlowsBatchInput updateFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node);
-                        final Future<RpcResult<UpdateFlowsBatchOutput>> resultUpdateFlowFuture = salFlowService.updateFlowsBatch(updateFlowsBatchInput);
-                        chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultUpdateFlowFuture, currentOffset);
-                        break;
-                    case GROUP_ADD:
-                        final AddGroupsBatchInput addGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node);
-                        final Future<RpcResult<AddGroupsBatchOutput>> resultAddGroupFuture = salGroupService.addGroupsBatch(addGroupsBatchInput);
-                        chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultAddGroupFuture, currentOffset);
-                        break;
-                    case GROUP_REMOVE:
-                        final RemoveGroupsBatchInput removeGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node);
-                        final Future<RpcResult<RemoveGroupsBatchOutput>> resultRemoveGroupFuture = salGroupService.removeGroupsBatch(removeGroupsBatchInput);
-                        chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultRemoveGroupFuture, currentOffset);
-                        break;
-                    case GROUP_UPDATE:
-                        final UpdateGroupsBatchInput updateGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node);
-                        final Future<RpcResult<UpdateGroupsBatchOutput>> resultUpdateGroupFuture = salGroupService.updateGroupsBatch(updateGroupsBatchInput);
-                        chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultUpdateGroupFuture, currentOffset);
-                        break;
-                    case METER_ADD:
-                        final AddMetersBatchInput addMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node);
-                        final Future<RpcResult<AddMetersBatchOutput>> resultAddMeterFuture = salMeterService.addMetersBatch(addMetersBatchInput);
-                        chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultAddMeterFuture, currentOffset);
-                        break;
-                    case METER_REMOVE:
-                        final RemoveMetersBatchInput removeMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node);
-                        final Future<RpcResult<RemoveMetersBatchOutput>> resultRemoveMeterFuture = salMeterService.removeMetersBatch(removeMetersBatchInput);
-                        chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultRemoveMeterFuture, currentOffset);
-                        break;
-                    case METER_UPDATE:
-                        final UpdateMetersBatchInput updateMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node);
-                        final Future<RpcResult<UpdateMetersBatchOutput>> resultUpdateMeterFuture = salMeterService.updateMetersBatch(updateMetersBatchInput);
-                        chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultUpdateMeterFuture, currentOffset);
-                        break;
-                    default:
-                        LOG.warn("Unsupported plan-step type occurred: {} -> OMITTING", planStep.getStepType());
-                        chainOutput = FlatBatchUtil.createEmptyRpcBatchResultFuture(true);
-                }
-                return chainOutput;
+                return getChainOutput(node, planStep, currentOffset);
             }));
             stepOffset += planStep.getTaskBag().size();
         }
@@ -179,4 +128,62 @@ public class SalFlatBatchServiceImpl implements SalFlatBatchService {
         return chainJobs;
     }
 
+    private ListenableFuture<RpcResult<ProcessFlatBatchOutput>> getChainOutput(final NodeRef node,
+                                                                               final BatchPlanStep planStep,
+                                                                               final int currentOffset) {
+        final ListenableFuture<RpcResult<ProcessFlatBatchOutput>> chainOutput;
+
+        switch (planStep.getStepType()) {
+            case FLOW_ADD:
+                final AddFlowsBatchInput addFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchAddFlow(planStep, node);
+                final Future<RpcResult<AddFlowsBatchOutput>> resultAddFlowFuture = salFlowService.addFlowsBatch(addFlowsBatchInput);
+                chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultAddFlowFuture, currentOffset);
+                break;
+            case FLOW_REMOVE:
+                final RemoveFlowsBatchInput removeFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchRemoveFlow(planStep, node);
+                final Future<RpcResult<RemoveFlowsBatchOutput>> resultRemoveFlowFuture = salFlowService.removeFlowsBatch(removeFlowsBatchInput);
+                chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultRemoveFlowFuture, currentOffset);
+                break;
+            case FLOW_UPDATE:
+                final UpdateFlowsBatchInput updateFlowsBatchInput = FlatBatchFlowAdapters.adaptFlatBatchUpdateFlow(planStep, node);
+                final Future<RpcResult<UpdateFlowsBatchOutput>> resultUpdateFlowFuture = salFlowService.updateFlowsBatch(updateFlowsBatchInput);
+                chainOutput = FlatBatchFlowAdapters.convertFlowBatchFutureForChain(resultUpdateFlowFuture, currentOffset);
+                break;
+            case GROUP_ADD:
+                final AddGroupsBatchInput addGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchAddGroup(planStep, node);
+                final Future<RpcResult<AddGroupsBatchOutput>> resultAddGroupFuture = salGroupService.addGroupsBatch(addGroupsBatchInput);
+                chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultAddGroupFuture, currentOffset);
+                break;
+            case GROUP_REMOVE:
+                final RemoveGroupsBatchInput removeGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchRemoveGroup(planStep, node);
+                final Future<RpcResult<RemoveGroupsBatchOutput>> resultRemoveGroupFuture = salGroupService.removeGroupsBatch(removeGroupsBatchInput);
+                chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultRemoveGroupFuture, currentOffset);
+                break;
+            case GROUP_UPDATE:
+                final UpdateGroupsBatchInput updateGroupsBatchInput = FlatBatchGroupAdapters.adaptFlatBatchUpdateGroup(planStep, node);
+                final Future<RpcResult<UpdateGroupsBatchOutput>> resultUpdateGroupFuture = salGroupService.updateGroupsBatch(updateGroupsBatchInput);
+                chainOutput = FlatBatchGroupAdapters.convertGroupBatchFutureForChain(resultUpdateGroupFuture, currentOffset);
+                break;
+            case METER_ADD:
+                final AddMetersBatchInput addMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchAddMeter(planStep, node);
+                final Future<RpcResult<AddMetersBatchOutput>> resultAddMeterFuture = salMeterService.addMetersBatch(addMetersBatchInput);
+                chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultAddMeterFuture, currentOffset);
+                break;
+            case METER_REMOVE:
+                final RemoveMetersBatchInput removeMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchRemoveMeter(planStep, node);
+                final Future<RpcResult<RemoveMetersBatchOutput>> resultRemoveMeterFuture = salMeterService.removeMetersBatch(removeMetersBatchInput);
+                chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultRemoveMeterFuture, currentOffset);
+                break;
+            case METER_UPDATE:
+                final UpdateMetersBatchInput updateMetersBatchInput = FlatBatchMeterAdapters.adaptFlatBatchUpdateMeter(planStep, node);
+                final Future<RpcResult<UpdateMetersBatchOutput>> resultUpdateMeterFuture = salMeterService.updateMetersBatch(updateMetersBatchInput);
+                chainOutput = FlatBatchMeterAdapters.convertMeterBatchFutureForChain(resultUpdateMeterFuture, currentOffset);
+                break;
+            default:
+                LOG.warn("Unsupported plan-step type occurred: {} -> OMITTING", planStep.getStepType());
+                chainOutput = FlatBatchUtil.createEmptyRpcBatchResultFuture(true);
+        }
+
+        return chainOutput;
+    }
 }
index 8000e8e1245e8bf30e50079081b342872b48b3a0..db4fb135fdf9d95467c0b7eeee9334b323b63284 100644 (file)
@@ -78,47 +78,9 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
     @Override
     public Future<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) {
         final FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(input);
-
         final ListenableFuture<RpcResult<AddFlowOutput>> future =
                 flowAdd.processFlowModInputBuilders(flowAdd.toFlowModInputs(input));
-        Futures.addCallback(future, new FutureCallback<RpcResult<AddFlowOutput>>() {
-            @Override
-            public void onSuccess(final RpcResult<AddFlowOutput> rpcResult) {
-                if (rpcResult.isSuccessful()) {
-                    final FlowId flowId;
-                    final FlowDescriptor flowDescriptor;
-
-                    if (Objects.nonNull(input.getFlowRef())) {
-                        flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class, FlowKey.class).getId();
-                        flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
-                        deviceContext.getDeviceFlowRegistry().store(flowRegistryKey, flowDescriptor);
-
-                    } else {
-                        flowId = deviceContext.getDeviceFlowRegistry().storeIfNecessary(flowRegistryKey);
-                        flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
-                    }
-
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("flow add with id={},finished without error,", flowId.getValue());
-                    }
-                    if (itemLifecycleListener != null) {
-                        KeyedInstanceIdentifier<Flow, FlowKey> flowPath = createFlowPath(flowDescriptor,
-                                deviceContext.getDeviceInfo().getNodeInstanceIdentifier());
-                        final FlowBuilder flowBuilder = new FlowBuilder(input).setId(flowDescriptor.getFlowId());
-                        itemLifecycleListener.onAdded(flowPath, flowBuilder.build());
-                    }
-                } else {
-                    deviceContext.getDeviceFlowRegistry().markToBeremoved(flowRegistryKey);
-                    LOG.error("flow add failed for flow={}, errors={}", input.toString(),
-                            errorsToString(rpcResult.getErrors()));
-                }
-            }
-
-            @Override
-            public void onFailure(final Throwable throwable) {
-               LOG.error("Service call for adding flow={} failed, reason {} .", input.toString(), throwable);
-            }
-        });
+        Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey));
 
         return future;
     }
@@ -129,34 +91,7 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
 
         final ListenableFuture<RpcResult<RemoveFlowOutput>> future =
                 flowRemove.processFlowModInputBuilders(flowRemove.toFlowModInputs(input));
-        Futures.addCallback(future, new FutureCallback<RpcResult<RemoveFlowOutput>>() {
-            @Override
-            public void onSuccess(final RpcResult<RemoveFlowOutput> result) {
-                if (result.isSuccessful()) {
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("flow removed finished without error,");
-                    }
-                    FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(input);
-                    deviceContext.getDeviceFlowRegistry().markToBeremoved(flowRegistryKey);
-                    if (itemLifecycleListener != null) {
-                        final FlowDescriptor flowDescriptor =
-                                deviceContext.getDeviceFlowRegistry().retrieveIdForFlow(flowRegistryKey);
-                        if (flowDescriptor != null) {
-                            KeyedInstanceIdentifier<Flow, FlowKey> flowPath = createFlowPath(flowDescriptor,
-                                    deviceContext.getDeviceInfo().getNodeInstanceIdentifier());
-                            itemLifecycleListener.onRemoved(flowPath);
-                        }
-                    }
-                } else {
-                    LOG.error("Flow remove failed with errors : {}",errorsToString(result.getErrors()));
-                }
-            }
-
-            @Override
-            public void onFailure(final Throwable throwable) {
-                LOG.error("Service call for removing flow with id {} failed ,reason {}",input.getFlowRef().getValue(), throwable);
-            }
-        });
+        Futures.addCallback(future, new RemoveFlowCallback(input));
 
         return future;
     }
@@ -173,9 +108,8 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
 
     @Override
     public Future<RpcResult<UpdateFlowOutput>> updateFlow(final UpdateFlowInput input) {
-        final UpdateFlowInput in = input;
-        final UpdatedFlow updated = in.getUpdatedFlow();
-        final OriginalFlow original = in.getOriginalFlow();
+        final UpdatedFlow updated = input.getUpdatedFlow();
+        final OriginalFlow original = input.getOriginalFlow();
 
         final List<FlowModInputBuilder> allFlowMods = new ArrayList<>();
         final List<FlowModInputBuilder> ofFlowModInputs;
@@ -196,60 +130,150 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
 
         allFlowMods.addAll(ofFlowModInputs);
         ListenableFuture<RpcResult<UpdateFlowOutput>> future = flowUpdate.processFlowModInputBuilders(allFlowMods);
-        Futures.addCallback(future, new FutureCallback<RpcResult<UpdateFlowOutput>>() {
-            @Override
-            public void onSuccess(final RpcResult<UpdateFlowOutput> o) {
-                FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(original);
+        Futures.addCallback(future, new UpdateFlowCallback(input));
+        return future;
+    }
+
+    @VisibleForTesting
+    static KeyedInstanceIdentifier<Flow, FlowKey> createFlowPath(FlowDescriptor flowDescriptor,
+                                                                 KeyedInstanceIdentifier<Node, NodeKey> nodePath) {
+        return nodePath.augmentation(FlowCapableNode.class)
+                .child(Table.class, flowDescriptor.getTableKey())
+                .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
+    }
 
-                FlowRegistryKey updatedflowRegistryKey = FlowRegistryKeyFactory.create(updated);
-                final FlowRef flowRef = input.getFlowRef();
-                final DeviceFlowRegistry deviceFlowRegistry = deviceContext.getDeviceFlowRegistry();
+    private class AddFlowCallback implements FutureCallback<RpcResult<AddFlowOutput>> {
+        private final AddFlowInput input;
+        private final FlowRegistryKey flowRegistryKey;
 
-                if (flowRef == null) { //then this is equivalent to a delete
-                    deviceFlowRegistry.markToBeremoved(flowRegistryKey);
+        private AddFlowCallback(final AddFlowInput input,
+                                final FlowRegistryKey flowRegistryKey) {
+            this.input = input;
+            this.flowRegistryKey = flowRegistryKey;
+        }
 
-                    if (itemLifecycleListener != null) {
-                        final FlowDescriptor flowDescriptor =
-                                deviceContext.getDeviceFlowRegistry().retrieveIdForFlow( flowRegistryKey);
-                        KeyedInstanceIdentifier<Flow, FlowKey> flowPath = createFlowPath(flowDescriptor,
-                                deviceContext.getDeviceInfo().getNodeInstanceIdentifier());
-                        itemLifecycleListener.onRemoved(flowPath);
-                    }
-                } else { //this is either an add or an update
-                    final FlowId flowId = flowRef.getValue().firstKeyOf(Flow.class, FlowKey.class).getId();
-                    final FlowDescriptor flowDescriptor = FlowDescriptorFactory.create(updated.getTableId(), flowId);
-                    deviceFlowRegistry.store(updatedflowRegistryKey, flowDescriptor);
+        @Override
+        public void onSuccess(final RpcResult<AddFlowOutput> rpcResult) {
+            if (rpcResult.isSuccessful()) {
+                final FlowDescriptor flowDescriptor;
+
+                if (Objects.nonNull(input.getFlowRef())) {
+                    final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class, FlowKey.class).getId();
+                    flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
+                    deviceContext.getDeviceFlowRegistry().store(flowRegistryKey, flowDescriptor);
+                } else {
+                    final FlowId flowId = deviceContext.getDeviceFlowRegistry().storeIfNecessary(flowRegistryKey);
+                    flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
+                }
 
-                    if (itemLifecycleListener != null) {
+                LOG.debug("flow add with id={},finished without error,", flowDescriptor.getFlowId().getValue());
+
+                if (itemLifecycleListener != null) {
+                    KeyedInstanceIdentifier<Flow, FlowKey> flowPath = createFlowPath(flowDescriptor,
+                            deviceContext.getDeviceInfo().getNodeInstanceIdentifier());
+                    final FlowBuilder flowBuilder = new FlowBuilder(input).setId(flowDescriptor.getFlowId());
+                    itemLifecycleListener.onAdded(flowPath, flowBuilder.build());
+                }
+            } else {
+                LOG.error("flow add failed for flow={}, errors={}", input.toString(), errorsToString(rpcResult.getErrors()));
+            }
+        }
+
+        @Override
+        public void onFailure(final Throwable throwable) {
+            LOG.error("Service call for adding flow={} failed, reason {} .", input.toString(), throwable);
+        }
+    }
+
+    private class RemoveFlowCallback implements FutureCallback<RpcResult<RemoveFlowOutput>> {
+        private final RemoveFlowInput input;
+
+        private RemoveFlowCallback(final RemoveFlowInput input) {
+            this.input = input;
+        }
+
+        @Override
+        public void onSuccess(final RpcResult<RemoveFlowOutput> result) {
+            if (result.isSuccessful()) {
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("flow removed finished without error,");
+                }
+                FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(input);
+                deviceContext.getDeviceFlowRegistry().markToBeremoved(flowRegistryKey);
+                if (itemLifecycleListener != null) {
+                    final FlowDescriptor flowDescriptor =
+                            deviceContext.getDeviceFlowRegistry().retrieveIdForFlow(flowRegistryKey);
+                    if (flowDescriptor != null) {
                         KeyedInstanceIdentifier<Flow, FlowKey> flowPath = createFlowPath(flowDescriptor,
                                 deviceContext.getDeviceInfo().getNodeInstanceIdentifier());
-                        final FlowBuilder flowBuilder = new FlowBuilder(
-                                                    input.getUpdatedFlow()).setId(flowDescriptor.getFlowId());
-
-                        boolean isUpdate = null !=
-                                            deviceFlowRegistry.retrieveIdForFlow(flowRegistryKey);
-                        if (isUpdate) {
-                            itemLifecycleListener.onUpdated(flowPath, flowBuilder.build());
-                        } else {
-                            itemLifecycleListener.onAdded(flowPath, flowBuilder.build());
-                        }
+                        itemLifecycleListener.onRemoved(flowPath);
                     }
                 }
+            } else {
+                LOG.error("Flow remove failed with errors : {}",errorsToString(result.getErrors()));
             }
+        }
 
-            @Override
-            public void onFailure(final Throwable throwable) {
-                LOG.error("Service call for updating flow failed, reason{}", throwable);
-            }
-        });
-        return future;
+        @Override
+        public void onFailure(final Throwable throwable) {
+            LOG.error("Service call for removing flow with id {} failed ,reason {}",input.getFlowRef().getValue(), throwable);
+        }
     }
 
-    @VisibleForTesting
-    static KeyedInstanceIdentifier<Flow, FlowKey> createFlowPath(FlowDescriptor flowDescriptor,
-                                                                 KeyedInstanceIdentifier<Node, NodeKey> nodePath) {
-        return nodePath.augmentation(FlowCapableNode.class)
-                .child(Table.class, flowDescriptor.getTableKey())
-                .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
+    private class UpdateFlowCallback implements FutureCallback<RpcResult<UpdateFlowOutput>> {
+        private final UpdateFlowInput input;
+
+        private UpdateFlowCallback(UpdateFlowInput input) {
+            this.input = input;
+        }
+
+        @Override
+        public void onSuccess(final RpcResult<UpdateFlowOutput> o) {
+            final UpdatedFlow updated = input.getUpdatedFlow();
+            final OriginalFlow original = input.getOriginalFlow();
+            FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(original);
+
+            FlowRegistryKey updatedflowRegistryKey = FlowRegistryKeyFactory.create(updated);
+            final FlowRef flowRef = input.getFlowRef();
+            final DeviceFlowRegistry deviceFlowRegistry = deviceContext.getDeviceFlowRegistry();
+
+            if (flowRef == null) {
+                // then this is equivalent to a delete
+                deviceFlowRegistry.markToBeremoved(flowRegistryKey);
+
+                if (itemLifecycleListener != null) {
+                    final FlowDescriptor flowDescriptor =
+                            deviceContext.getDeviceFlowRegistry().retrieveIdForFlow( flowRegistryKey);
+                    KeyedInstanceIdentifier<Flow, FlowKey> flowPath = createFlowPath(flowDescriptor,
+                            deviceContext.getDeviceInfo().getNodeInstanceIdentifier());
+                    itemLifecycleListener.onRemoved(flowPath);
+                }
+            } else {
+                // this is either an add or an update
+                final FlowId flowId = flowRef.getValue().firstKeyOf(Flow.class, FlowKey.class).getId();
+                final FlowDescriptor flowDescriptor = FlowDescriptorFactory.create(updated.getTableId(), flowId);
+                deviceFlowRegistry.store(updatedflowRegistryKey, flowDescriptor);
+
+                if (itemLifecycleListener != null) {
+                    KeyedInstanceIdentifier<Flow, FlowKey> flowPath = createFlowPath(flowDescriptor,
+                            deviceContext.getDeviceInfo().getNodeInstanceIdentifier());
+                    final FlowBuilder flowBuilder = new FlowBuilder(
+                            input.getUpdatedFlow()).setId(flowDescriptor.getFlowId());
+
+                    boolean isUpdate = null !=
+                            deviceFlowRegistry.retrieveIdForFlow(flowRegistryKey);
+                    if (isUpdate) {
+                        itemLifecycleListener.onUpdated(flowPath, flowBuilder.build());
+                    } else {
+                        itemLifecycleListener.onAdded(flowPath, flowBuilder.build());
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void onFailure(final Throwable throwable) {
+            LOG.error("Service call for updating flow failed, reason{}", throwable);
+        }
     }
 }
\ No newline at end of file