Merge "GetFlowNodeCache cli"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / sal / SalFlowServiceImpl.java
index a164063badf5625e21a98ffe2a8aac25d23a6c5a..a9f4c04e6439e66a8d246ec33ff9d5c571eca8f3 100644 (file)
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
@@ -7,37 +7,36 @@
  */
 package org.opendaylight.openflowplugin.impl.services.sal;
 
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.EvictingQueue;
+import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.Future;
-import javax.annotation.Nullable;
+import java.util.Queue;
 import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCache;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupStatus;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
-import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleSource;
-import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
 import org.opendaylight.openflowplugin.impl.registry.flow.FlowDescriptorFactory;
 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
-import org.opendaylight.openflowplugin.impl.services.singlelayer.SingleLayerFlowService;
 import org.opendaylight.openflowplugin.impl.services.multilayer.MultiLayerFlowService;
+import org.opendaylight.openflowplugin.impl.services.singlelayer.SingleLayerFlowService;
 import org.opendaylight.openflowplugin.impl.util.ErrorUtil;
+import org.opendaylight.openflowplugin.impl.util.FlowCreatorUtil;
+import org.opendaylight.openflowplugin.impl.util.PathUtil;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
-import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
@@ -49,17 +48,16 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.Upda
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.Uint8;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
+public class SalFlowServiceImpl implements SalFlowService {
     private static final Logger LOG = LoggerFactory.getLogger(SalFlowServiceImpl.class);
     private final MultiLayerFlowService<UpdateFlowOutput> flowUpdate;
     private final MultiLayerFlowService<AddFlowOutput> flowAdd;
@@ -68,60 +66,74 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
     private final SingleLayerFlowService<UpdateFlowOutput> flowUpdateMessage;
     private final SingleLayerFlowService<RemoveFlowOutput> flowRemoveMessage;
     private final DeviceContext deviceContext;
-    private ItemLifecycleListener itemLifecycleListener;
+    private static final Uint8 OFPTT_ALL = Uint8.MAX_VALUE;
+    private final FlowGroupCacheManager provider;
+    public static final int FLOWGROUP_CACHE_SIZE = 10000;
 
-    public SalFlowServiceImpl(final RequestContextStack requestContextStack, final DeviceContext deviceContext, final ConvertorExecutor convertorExecutor) {
+
+    public SalFlowServiceImpl(final RequestContextStack requestContextStack,
+                              final DeviceContext deviceContext,
+                              final ConvertorExecutor convertorExecutor,
+                              final FlowGroupCacheManager provider) {
         this.deviceContext = deviceContext;
-        flowRemove = new MultiLayerFlowService<>(requestContextStack, deviceContext, RemoveFlowOutput.class, convertorExecutor);
-        flowAdd = new MultiLayerFlowService<>(requestContextStack, deviceContext, AddFlowOutput.class, convertorExecutor);
-        flowUpdate = new MultiLayerFlowService<>(requestContextStack, deviceContext, UpdateFlowOutput.class, convertorExecutor);
+        this.provider = provider;
+        flowRemove = new MultiLayerFlowService<>(requestContextStack,
+                                                 deviceContext,
+                                                 RemoveFlowOutput.class,
+                                                 convertorExecutor);
+        flowAdd = new MultiLayerFlowService<>(requestContextStack,
+                                              deviceContext,
+                                              AddFlowOutput.class,
+                                              convertorExecutor);
+        flowUpdate = new MultiLayerFlowService<>(requestContextStack,
+                                                 deviceContext,
+                                                 UpdateFlowOutput.class,
+                                                 convertorExecutor);
         flowAddMessage = new SingleLayerFlowService<>(requestContextStack, deviceContext, AddFlowOutput.class);
         flowUpdateMessage = new SingleLayerFlowService<>(requestContextStack, deviceContext, UpdateFlowOutput.class);
-        flowRemoveMessage= new SingleLayerFlowService<>(requestContextStack, deviceContext, RemoveFlowOutput.class);
+        flowRemoveMessage = new SingleLayerFlowService<>(requestContextStack, deviceContext, RemoveFlowOutput.class);
     }
 
     @Override
-    public void setItemLifecycleListener(@Nullable ItemLifecycleListener itemLifecycleListener) {
-        this.itemLifecycleListener = itemLifecycleListener;
-    }
-
-    @Override
-    public Future<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) {
-        final FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
+    public ListenableFuture<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) {
+        final FlowRegistryKey flowRegistryKey =
+                FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
         final ListenableFuture<RpcResult<AddFlowOutput>> future;
-
+        NodeId nodeId = PathUtil.extractNodeId(input.getNode());
         if (flowAddMessage.canUseSingleLayerSerialization()) {
             future = flowAddMessage.handleServiceCall(input);
-            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey));
+            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey, nodeId),
+                    MoreExecutors.directExecutor());
         } else {
             future = flowAdd.processFlowModInputBuilders(flowAdd.toFlowModInputs(input));
-            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey));
+            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey, nodeId),
+                    MoreExecutors.directExecutor());
 
         }
         return future;
     }
 
     @Override
-    public Future<RpcResult<RemoveFlowOutput>> removeFlow(final RemoveFlowInput input) {
+    public ListenableFuture<RpcResult<RemoveFlowOutput>> removeFlow(final RemoveFlowInput input) {
         final ListenableFuture<RpcResult<RemoveFlowOutput>> future;
-
+        NodeId nodeId = PathUtil.extractNodeId(input.getNode());
         if (flowRemoveMessage.canUseSingleLayerSerialization()) {
             future = flowRemoveMessage.handleServiceCall(input);
-            Futures.addCallback(future, new RemoveFlowCallback(input));
+            Futures.addCallback(future, new RemoveFlowCallback(input, nodeId), MoreExecutors.directExecutor());
 
         } else {
             future = flowRemove.processFlowModInputBuilders(flowRemove.toFlowModInputs(input));
-            Futures.addCallback(future, new RemoveFlowCallback(input));
+            Futures.addCallback(future, new RemoveFlowCallback(input, nodeId), MoreExecutors.directExecutor());
         }
 
         return future;
     }
 
     @Override
-    public Future<RpcResult<UpdateFlowOutput>> updateFlow(final UpdateFlowInput input) {
+    public ListenableFuture<RpcResult<UpdateFlowOutput>> updateFlow(final UpdateFlowInput input) {
         final UpdatedFlow updated = input.getUpdatedFlow();
         final OriginalFlow original = input.getOriginalFlow();
-
+        String nodeId =  PathUtil.extractNodeId(input.getNode()).getValue();
         final List<FlowModInputBuilder> allFlowMods = new ArrayList<>();
         final List<FlowModInputBuilder> ofFlowModInputs;
 
@@ -131,9 +143,9 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
             if (!FlowCreatorUtil.canModifyFlow(original, updated, flowUpdateMessage.getVersion())) {
                 final SettableFuture<RpcResult<UpdateFlowOutput>> objectSettableFuture = SettableFuture.create();
 
-                final ListenableFuture<List<RpcResult<UpdateFlowOutput>>> listListenableFuture = Futures.successfulAsList(
-                        flowUpdateMessage.handleServiceCall(input.getOriginalFlow()),
-                        flowUpdateMessage.handleServiceCall(input.getUpdatedFlow()));
+                final ListenableFuture<List<RpcResult<UpdateFlowOutput>>> listListenableFuture =
+                        Futures.successfulAsList(flowUpdateMessage.handleServiceCall(input.getOriginalFlow()),
+                                                 flowUpdateMessage.handleServiceCall(input.getUpdatedFlow()));
 
                 Futures.addCallback(listListenableFuture, new FutureCallback<List<RpcResult<UpdateFlowOutput>>>() {
                     @Override
@@ -160,11 +172,11 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
                     }
 
                     @Override
-                    public void onFailure(final Throwable t) {
+                    public void onFailure(final Throwable throwable) {
                         RpcResultBuilder<UpdateFlowOutput> rpcResultBuilder = RpcResultBuilder.failed();
                         objectSettableFuture.set(rpcResultBuilder.build());
                     }
-                });
+                }, MoreExecutors.directExecutor());
 
                 future = objectSettableFuture;
             } else {
@@ -190,52 +202,49 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
             future = flowUpdate.processFlowModInputBuilders(allFlowMods);
         }
 
-        Futures.addCallback(future, new UpdateFlowCallback(input));
+        Futures.addCallback(future, new UpdateFlowCallback(input, nodeId), MoreExecutors.directExecutor());
         return future;
     }
 
-    @VisibleForTesting
-    private static KeyedInstanceIdentifier<Flow, FlowKey> createFlowPath(FlowDescriptor flowDescriptor,
-                                                                 KeyedInstanceIdentifier<Node, NodeKey> nodePath) {
-        return nodePath.augmentation(FlowCapableNode.class)
-                .child(Table.class, flowDescriptor.getTableKey())
-                .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
-    }
-
-    private class AddFlowCallback implements FutureCallback<RpcResult<AddFlowOutput>> {
+    private final class AddFlowCallback implements FutureCallback<RpcResult<AddFlowOutput>> {
         private final AddFlowInput input;
         private final FlowRegistryKey flowRegistryKey;
+        private final NodeId nodeId;
 
         private AddFlowCallback(final AddFlowInput input,
-                                final FlowRegistryKey flowRegistryKey) {
+                                final FlowRegistryKey flowRegistryKey,
+                                final NodeId nodeId) {
             this.input = input;
             this.flowRegistryKey = flowRegistryKey;
+            this.nodeId = nodeId;
         }
 
         @Override
         public void onSuccess(final RpcResult<AddFlowOutput> rpcResult) {
             if (rpcResult.isSuccessful()) {
                 final FlowDescriptor flowDescriptor;
-
-                if (Objects.nonNull(input.getFlowRef())) {
-                    final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class, FlowKey.class).getId();
-                    flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
-                    deviceContext.getDeviceFlowRegistry().store(flowRegistryKey, flowDescriptor);
+                final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class).getId();
+                FlowGroupCache cache = new FlowGroupCache(flowId.getValue(), input.getTableId().toString(),
+                        FlowGroupStatus.ADDED, LocalDateTime.now());
+                if (provider.getAllNodesFlowGroupCache().containsKey(nodeId.getValue())) {
+                    provider.getAllNodesFlowGroupCache().get(nodeId.getValue()).add(cache);
                 } else {
-                    final FlowId flowId = deviceContext.getDeviceFlowRegistry().storeIfNecessary(flowRegistryKey);
+                    Queue<FlowGroupCache> flowGroupCacheList =
+                            Queues.synchronizedQueue(EvictingQueue.create(FLOWGROUP_CACHE_SIZE));
+                    flowGroupCacheList.add(cache);
+                    provider.getAllNodesFlowGroupCache().put(nodeId.getValue(), flowGroupCacheList);
+                }
+                if (input.getFlowRef() != null) {
                     flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
+                    deviceContext.getDeviceFlowRegistry().storeDescriptor(flowRegistryKey, flowDescriptor);
+                } else {
+                    deviceContext.getDeviceFlowRegistry().store(flowRegistryKey);
+                    flowDescriptor = deviceContext.getDeviceFlowRegistry().retrieveDescriptor(flowRegistryKey);
                 }
 
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Flow add with id={} finished without error", flowDescriptor.getFlowId().getValue());
                 }
-
-                if (itemLifecycleListener != null) {
-                    KeyedInstanceIdentifier<Flow, FlowKey> flowPath = createFlowPath(flowDescriptor,
-                            deviceContext.getDeviceInfo().getNodeInstanceIdentifier());
-                    final FlowBuilder flowBuilder = new FlowBuilder(input).setId(flowDescriptor.getFlowId());
-                    itemLifecycleListener.onAdded(flowPath, flowBuilder.build());
-                }
             } else {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Flow add failed for flow={}, errors={}", input,
@@ -246,15 +255,17 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
 
         @Override
         public void onFailure(final Throwable throwable) {
-            LOG.warn("Service call for adding flow={} failed, reason: {}", input, throwable);
+            LOG.warn("Service call for adding flow={} failed", input, throwable);
         }
     }
 
-    private class RemoveFlowCallback implements FutureCallback<RpcResult<RemoveFlowOutput>> {
+    private final class RemoveFlowCallback implements FutureCallback<RpcResult<RemoveFlowOutput>> {
         private final RemoveFlowInput input;
+        private final NodeId nodeId;
 
-        private RemoveFlowCallback(final RemoveFlowInput input) {
+        private RemoveFlowCallback(final RemoveFlowInput input, final NodeId nodeId) {
             this.input = input;
+            this.nodeId = nodeId;
         }
 
         @Override
@@ -263,17 +274,24 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Flow remove finished without error for flow={}", input);
                 }
-                FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
-                deviceContext.getDeviceFlowRegistry().removeDescriptor(flowRegistryKey);
-
-                if (itemLifecycleListener != null) {
-                    final FlowDescriptor flowDescriptor =
-                            deviceContext.getDeviceFlowRegistry().retrieveIdForFlow(flowRegistryKey);
-                    if (flowDescriptor != null) {
-                        KeyedInstanceIdentifier<Flow, FlowKey> flowPath = createFlowPath(flowDescriptor,
-                                deviceContext.getDeviceInfo().getNodeInstanceIdentifier());
-                        itemLifecycleListener.onRemoved(flowPath);
+                if (input.getTableId() != null && !input.getTableId().equals(OFPTT_ALL)) {
+                    FlowRegistryKey flowRegistryKey =
+                            FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
+                    deviceContext.getDeviceFlowRegistry().addMark(flowRegistryKey);
+                    final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class).getId();
+                    FlowGroupCache cache = new FlowGroupCache(flowId.getValue(),
+                            input.getTableId().toString(), FlowGroupStatus.REMOVED,
+                            LocalDateTime.now());
+                    if (provider.getAllNodesFlowGroupCache().containsKey(nodeId.getValue())) {
+                        provider.getAllNodesFlowGroupCache().get(nodeId.getValue()).add(cache);
+                    } else {
+                        Queue<FlowGroupCache> flowGroupCacheList =
+                                Queues.synchronizedQueue(EvictingQueue.create(FLOWGROUP_CACHE_SIZE));
+                        flowGroupCacheList.add(cache);
+                        provider.getAllNodesFlowGroupCache().put(nodeId.getValue(), flowGroupCacheList);
                     }
+                } else {
+                    deviceContext.getDeviceFlowRegistry().clearFlowRegistry();
                 }
             } else {
                 if (LOG.isDebugEnabled()) {
@@ -285,58 +303,67 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
 
         @Override
         public void onFailure(final Throwable throwable) {
-            LOG.warn("Service call for removing flow={} failed, reason: {}", input, throwable);
+            LOG.warn("Service call for removing flow={} failed", input, throwable);
         }
     }
 
-    private class UpdateFlowCallback implements FutureCallback<RpcResult<UpdateFlowOutput>> {
+    private final class UpdateFlowCallback implements FutureCallback<RpcResult<UpdateFlowOutput>> {
         private final UpdateFlowInput input;
+        private final String nodeId;
 
-        private UpdateFlowCallback(UpdateFlowInput input) {
+        private UpdateFlowCallback(UpdateFlowInput input,  String nodeId) {
             this.input = input;
+            this.nodeId = nodeId;
         }
 
         @Override
-        public void onSuccess(final RpcResult<UpdateFlowOutput> o) {
+        public void onSuccess(final RpcResult<UpdateFlowOutput> updateFlowOutputRpcResult) {
             final DeviceFlowRegistry deviceFlowRegistry = deviceContext.getDeviceFlowRegistry();
-
+            final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class).getId();
             final UpdatedFlow updated = input.getUpdatedFlow();
             final OriginalFlow original = input.getOriginalFlow();
-            final FlowRegistryKey origFlowRegistryKey = FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), original);
-            final FlowRegistryKey updatedFlowRegistryKey = FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), updated);
-            final FlowDescriptor origFlowDescriptor = deviceFlowRegistry.retrieveIdForFlow(origFlowRegistryKey);
-
-            final boolean isUpdate = Objects.nonNull(origFlowDescriptor);
-            final FlowId fLowId = Objects.nonNull(input.getFlowRef())
-                    ? input.getFlowRef().getValue().firstKeyOf(Flow.class).getId()
-                    : isUpdate ? origFlowDescriptor.getFlowId() : deviceFlowRegistry.storeIfNecessary(updatedFlowRegistryKey);
-            final FlowDescriptor updatedFlowDescriptor = FlowDescriptorFactory.create(updated.getTableId(), fLowId);
-            if (isUpdate) {
-                deviceFlowRegistry.removeDescriptor(origFlowRegistryKey);
-                deviceFlowRegistry.store(updatedFlowRegistryKey, updatedFlowDescriptor);
+            final FlowRegistryKey origFlowRegistryKey =
+                    FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), original);
+            final FlowRegistryKey updatedFlowRegistryKey =
+                    FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), updated);
+            final FlowDescriptor origFlowDescriptor = deviceFlowRegistry.retrieveDescriptor(origFlowRegistryKey);
+
+            final boolean isUpdate = origFlowDescriptor != null;
+            final FlowDescriptor updatedFlowDescriptor;
+            FlowGroupCache cache = new FlowGroupCache(flowId.getValue(), updated.getTableId().toString(),
+                    FlowGroupStatus.MODIFIED,
+                    LocalDateTime.now());
+            if (provider.getAllNodesFlowGroupCache().containsKey(nodeId)) {
+                provider.getAllNodesFlowGroupCache().get(nodeId).add(cache);
+            } else {
+                Queue<FlowGroupCache> flowGroupCacheList =
+                        Queues.synchronizedQueue(EvictingQueue.create(FLOWGROUP_CACHE_SIZE));
+                flowGroupCacheList.add(cache);
+                provider.getAllNodesFlowGroupCache().put(nodeId, flowGroupCacheList);
             }
 
-            if (itemLifecycleListener != null) {
-                final KeyedInstanceIdentifier<Flow, FlowKey> flowPath =
-                        createFlowPath(
-                                updatedFlowDescriptor,
-                                deviceContext.getDeviceInfo().getNodeInstanceIdentifier());
-
-                final Flow flow = new FlowBuilder(updated)
-                        .setId(updatedFlowDescriptor.getFlowId())
-                        .build();
-
-                if (Objects.nonNull(origFlowDescriptor)) {
-                    itemLifecycleListener.onUpdated(flowPath, flow);
+            if (input.getFlowRef() != null) {
+                updatedFlowDescriptor =
+                        FlowDescriptorFactory.create(updated.getTableId(),
+                                                     input.getFlowRef().getValue().firstKeyOf(Flow.class).getId());
+            } else {
+                if (isUpdate) {
+                    updatedFlowDescriptor = origFlowDescriptor;
                 } else {
-                    itemLifecycleListener.onAdded(flowPath, flow);
+                    deviceFlowRegistry.store(updatedFlowRegistryKey);
+                    updatedFlowDescriptor = deviceFlowRegistry.retrieveDescriptor(updatedFlowRegistryKey);
                 }
             }
+
+            if (isUpdate) {
+                deviceFlowRegistry.addMark(origFlowRegistryKey);
+                deviceFlowRegistry.storeDescriptor(updatedFlowRegistryKey, updatedFlowDescriptor);
+            }
         }
 
         @Override
         public void onFailure(final Throwable throwable) {
-            LOG.warn("Service call for updating flow={} failed, reason: {}", input, throwable);
+            LOG.warn("Service call for updating flow={} failed", input, throwable);
         }
     }
 }