Fix statistics race condition on big flows 45/53545/1
authorTomas Slusny <tomas.slusny@pantheon.tech>
Fri, 24 Feb 2017 08:40:45 +0000 (09:40 +0100)
committerTomas Slusny <tomas.slusny@pantheon.tech>
Mon, 20 Mar 2017 09:17:43 +0000 (10:17 +0100)
Fix race condition that happens when we receive statistics faster
than flow is deleted from device, but we already sent request for
deletion and removed this flow from device registy.

Resolves: bug 6917

Change-Id: I95510fbc5d507eaffc34d58b3b7743e60911b541
Signed-off-by: Tomas Slusny <tomas.slusny@pantheon.tech>
25 files changed:
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/registry/CommonDeviceRegistry.java [new file with mode: 0644]
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/registry/flow/DeviceFlowRegistry.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/registry/flow/FlowDescriptor.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/registry/group/DeviceGroupRegistry.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/registry/meter/DeviceMeterRegistry.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/registry/flow/DeviceFlowRegistryImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/registry/group/DeviceGroupRegistryImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/registry/meter/DeviceMeterRegistryImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/MultipartRequestOnTheFlyCallback.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalGroupServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalMeterServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsGatheringUtils.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/services/direct/FlowDirectStatisticsService.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/registry/flow/DeviceFlowRegistryImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/registry/group/DeviceGroupRegistryImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/registry/meter/DeviceMeterRegistryImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/MultipartRequestOnTheFlyCallbackTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/SalGroupServiceImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/services/SalMeterServiceImplTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/statistics/StatisticsGatheringUtilsTest.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/statistics/services/direct/FlowDirectStatisticsServiceTest.java

diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/registry/CommonDeviceRegistry.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/registry/CommonDeviceRegistry.java
new file mode 100644 (file)
index 0000000..afa1d03
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.api.openflow.registry;
+
+import java.util.function.Consumer;
+
+public interface CommonDeviceRegistry<KEY> extends AutoCloseable {
+
+    /**
+     * Store KEY in device registry.
+     * @param key device registry key
+     */
+    void store(KEY key);
+
+    /**
+     * Add mark for specified KEY.
+     * @param key device registry key
+     */
+    void addMark(KEY key);
+
+    /**
+     * Checks if registry has mark for KEY.
+     * @param key device registry key
+     * @return true if device registry has mark for KEY
+     */
+    boolean hasMark(KEY key);
+
+    /**
+     * Process marked keys.
+     */
+    void processMarks();
+
+    /**
+     * Iterate over all keys in device registry.
+     * @param consumer key consumer
+     */
+    void forEach(Consumer<KEY> consumer);
+
+    /**
+     * Get device registry size.
+     * @return device registry size
+     */
+    int size();
+
+    @Override
+    void close();
+
+}
index e43433f4fb5692f4cbf0f94f4c630fd0b27bb3d3..d98f56b3f4cbf1a5b0ab033d31a2818a4ecb1d40 100644 (file)
@@ -12,30 +12,22 @@ package org.opendaylight.openflowplugin.api.openflow.registry.flow;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.List;
-import java.util.Map;
+import java.util.function.BiConsumer;
+import org.opendaylight.openflowplugin.api.openflow.registry.CommonDeviceRegistry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
 
 /**
  * Registry for mapping composite-key of flow ({@link FlowRegistryKey}) from device view
  * to flow descriptor ({@link FlowDescriptor}) as the identifier of the same flow in data store.
  */
-public interface DeviceFlowRegistry extends AutoCloseable {
+public interface DeviceFlowRegistry extends CommonDeviceRegistry<FlowRegistryKey> {
 
     ListenableFuture<List<Optional<FlowCapableNode>>> fill();
 
-    FlowDescriptor retrieveIdForFlow(FlowRegistryKey flowRegistryKey);
+    void storeDescriptor(FlowRegistryKey flowRegistryKey, FlowDescriptor flowDescriptor);
 
-    void store(FlowRegistryKey flowRegistryKey, FlowDescriptor flowDescriptor);
+    FlowDescriptor retrieveDescriptor(FlowRegistryKey flowRegistryKey);
 
-    FlowId storeIfNecessary(FlowRegistryKey flowRegistryKey);
+    void forEachEntry(BiConsumer<FlowRegistryKey, FlowDescriptor> consumer);
 
-    void removeDescriptor(FlowRegistryKey flowRegistryKey);
-
-    void update(FlowRegistryKey newFlowRegistryKey,FlowDescriptor flowDescriptor);
-
-    Map<FlowRegistryKey, FlowDescriptor> getAllFlowDescriptors();
-
-    @Override
-    void close();
-}
\ No newline at end of file
+}
index dd9cff527f31dcbeeaff76816b661cfe6c6eaf3a..24fcc945d1f0ef4f7c712b89bb5758f47518ee37 100644 (file)
@@ -11,12 +11,10 @@ package org.opendaylight.openflowplugin.api.openflow.registry.flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
 
-/**
- * Created by Martin Bobak &lt;mbobak@cisco.com&gt; on 9.4.2015.
- */
 public interface FlowDescriptor {
 
     FlowId getFlowId();
 
     TableKey getTableKey();
+
 }
index 05eb9f6d0da4ae00a698fc71f39cc0a3978897f5..7414f70f4d81e997387f59de51b88ed9dc34052d 100644 (file)
@@ -8,22 +8,9 @@
 
 package org.opendaylight.openflowplugin.api.openflow.registry.group;
 
-import java.util.List;
+import org.opendaylight.openflowplugin.api.openflow.registry.CommonDeviceRegistry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
 
-/**
- * Created by Martin Bobak &lt;mbobak@cisco.com&gt; on 15.4.2015.
- */
-public interface DeviceGroupRegistry extends AutoCloseable {
-
-    void store(GroupId groupId);
-
-    void markToBeremoved(GroupId groupId);
-
-    void removeMarked();
-
-    List<GroupId> getAllGroupIds();
+public interface DeviceGroupRegistry extends CommonDeviceRegistry<GroupId> {
 
-    @Override
-    void close();
 }
index 12540cb086c75c48e02e5baa75cff3883a6b150d..25b5bb9bb70328e9e901938e0dc5fd9dd49cdd4a 100644 (file)
@@ -8,23 +8,9 @@
 
 package org.opendaylight.openflowplugin.api.openflow.registry.meter;
 
-import java.util.List;
+import org.opendaylight.openflowplugin.api.openflow.registry.CommonDeviceRegistry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
 
-/**
- * Created by Martin Bobak &lt;mbobak@cisco.com&gt; on 15.4.2015.
- */
-public interface DeviceMeterRegistry extends AutoCloseable {
-
-    void store(MeterId meterId);
-
-    void markToBeremoved(MeterId meterId);
-
-    void removeMarked();
-
-    List<MeterId> getAllMeterIds();
-
-    @Override
-    void close();
+public interface DeviceMeterRegistry extends CommonDeviceRegistry<MeterId> {
 
 }
index f87bbaa8106762139d9e20a6ff4e6c72d32bce2d..b45234307fc14bcd5effe1b4e4fbc3863aacf57a 100644 (file)
@@ -332,7 +332,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
             //2. create registry key
             final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(getDeviceInfo().getVersion(), flowRemovedNotification);
             //3. lookup flowId
-            final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveIdForFlow(flowRegKey);
+            final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveDescriptor(flowRegKey);
             //4. if flowId present:
             if (flowDescriptor != null) {
                 // a) construct flow path
index 4115e8b5aa4e229867cb290407be1079fd92de0b..9492b690fb96d6be39bfa54ff2580fbe96a3399a 100644 (file)
@@ -11,20 +11,22 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
-import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
@@ -42,12 +44,14 @@ import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@ThreadSafe
 public class DeviceFlowRegistryImpl implements DeviceFlowRegistry {
     private static final Logger LOG = LoggerFactory.getLogger(DeviceFlowRegistryImpl.class);
     private static final String ALIEN_SYSTEM_FLOW_ID = "#UF$TABLE*";
     private static final AtomicInteger UNACCOUNTED_FLOWS_COUNTER = new AtomicInteger(0);
 
-    private final BiMap<FlowRegistryKey, FlowDescriptor> flowRegistry = Maps.synchronizedBiMap(HashBiMap.create());
+    private final BiMap<FlowRegistryKey, FlowDescriptor> flowRegistry = HashBiMap.create();
+    private final List<FlowRegistryKey> marks = new ArrayList<>();
     private final DataBroker dataBroker;
     private final KeyedInstanceIdentifier<Node, NodeKey> instanceIdentifier;
     private final List<ListenableFuture<List<Optional<FlowCapableNode>>>> lastFillFutures = new ArrayList<>();
@@ -66,13 +70,14 @@ public class DeviceFlowRegistryImpl implements DeviceFlowRegistry {
             if (!flowRegistry.containsKey(key)) {
                 LOG.trace("Found flow with table ID : {} and flow ID : {}", flow.getTableId(), flow.getId().getValue());
                 final FlowDescriptor descriptor = FlowDescriptorFactory.create(flow.getTableId(), flow.getId());
-                store(key, descriptor);
+                storeDescriptor(key, descriptor);
             }
         };
     }
 
     @Override
-    public ListenableFuture<List<Optional<FlowCapableNode>>> fill() {
+    @GuardedBy("this")
+    public synchronized ListenableFuture<List<Optional<FlowCapableNode>>> fill() {
         LOG.debug("Filling flow registry with flows for node: {}", instanceIdentifier.getKey().getId().getValue());
 
         // Prepare path for read transaction
@@ -80,12 +85,12 @@ public class DeviceFlowRegistryImpl implements DeviceFlowRegistry {
         final InstanceIdentifier<FlowCapableNode> path = instanceIdentifier.augmentation(FlowCapableNode.class);
 
         // First, try to fill registry with flows from DS/Configuration
-        CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> configFuture = fillFromDatastore(LogicalDatastoreType.CONFIGURATION, path);
+        final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> configFuture = fillFromDatastore(LogicalDatastoreType.CONFIGURATION, path);
 
         // Now, try to fill registry with flows from DS/Operational
         // in case of cluster fail over, when clients are not using DS/Configuration
         // for adding flows, but only RPCs
-        CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> operationalFuture = fillFromDatastore(LogicalDatastoreType.OPERATIONAL, path);
+        final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> operationalFuture = fillFromDatastore(LogicalDatastoreType.OPERATIONAL, path);
 
         // And at last, chain and return futures created above.
         // Also, cache this future, so call to DeviceFlowRegistry.close() will be able
@@ -95,6 +100,7 @@ public class DeviceFlowRegistryImpl implements DeviceFlowRegistry {
         return lastFillFuture;
     }
 
+    @GuardedBy("this")
     private CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> fillFromDatastore(final LogicalDatastoreType logicalDatastoreType, final InstanceIdentifier<FlowCapableNode> path) {
         // Create new read-only transaction
         final ReadOnlyTransaction transaction = dataBroker.newReadOnlyTransaction();
@@ -102,32 +108,32 @@ public class DeviceFlowRegistryImpl implements DeviceFlowRegistry {
         // Bail out early if transaction is null
         if (transaction == null) {
             return Futures.immediateFailedCheckedFuture(
-                    new ReadFailedException("Read transaction is null"));
+                new ReadFailedException("Read transaction is null"));
         }
 
         // Prepare read operation from datastore for path
         final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> future =
-                transaction.read(logicalDatastoreType, path);
+            transaction.read(logicalDatastoreType, path);
 
         // Bail out early if future is null
         if (future == null) {
             return Futures.immediateFailedCheckedFuture(
-                    new ReadFailedException("Future from read transaction is null"));
+                new ReadFailedException("Future from read transaction is null"));
         }
 
         Futures.addCallback(future, new FutureCallback<Optional<FlowCapableNode>>() {
             @Override
             public void onSuccess(Optional<FlowCapableNode> result) {
                 result.asSet().stream()
-                        .filter(Objects::nonNull)
-                        .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
-                        .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
-                        .filter(Objects::nonNull)
-                        .filter(table -> Objects.nonNull(table.getFlow()))
-                        .flatMap(table -> table.getFlow().stream())
-                        .filter(Objects::nonNull)
-                        .filter(flow -> Objects.nonNull(flow.getId()))
-                        .forEach(flowConsumer);
+                    .filter(Objects::nonNull)
+                    .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
+                    .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
+                    .filter(Objects::nonNull)
+                    .filter(table -> Objects.nonNull(table.getFlow()))
+                    .flatMap(table -> table.getFlow().stream())
+                    .filter(Objects::nonNull)
+                    .filter(flow -> Objects.nonNull(flow.getId()))
+                    .forEach(flowConsumer);
 
                 // After we are done with reading from datastore, close the transaction
                 transaction.close();
@@ -144,84 +150,115 @@ public class DeviceFlowRegistryImpl implements DeviceFlowRegistry {
     }
 
     @Override
-    public FlowDescriptor retrieveIdForFlow(final FlowRegistryKey flowRegistryKey) {
+    @GuardedBy("this")
+    public synchronized FlowDescriptor retrieveDescriptor(final FlowRegistryKey flowRegistryKey) {
         LOG.trace("Retrieving flow descriptor for flow hash : {}", flowRegistryKey.hashCode());
-        FlowDescriptor flowDescriptor = flowRegistry.get(flowRegistryKey);
-        // Get FlowDescriptor from flow registry
-        if(flowDescriptor == null){
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Failed to retrieve flow descriptor for flow hash : {}, trying with custom equals method", flowRegistryKey.hashCode());
-            }
-            for(Map.Entry<FlowRegistryKey, FlowDescriptor> fd : flowRegistry.entrySet()) {
-                if (flowRegistryKey.equals(fd.getKey())) {
-                    flowDescriptor = fd.getValue();
-                    break;
-                }
-            }
-        }
-        return flowDescriptor;
+        return flowRegistry.get(correctFlowRegistryKey(flowRegistry.keySet(), flowRegistryKey));
     }
 
     @Override
-    public void store(final FlowRegistryKey flowRegistryKey, final FlowDescriptor flowDescriptor) {
+    @GuardedBy("this")
+    public synchronized void storeDescriptor(final FlowRegistryKey flowRegistryKey, final FlowDescriptor flowDescriptor) {
+        final FlowRegistryKey correctFlowRegistryKey = correctFlowRegistryKey(flowRegistry.keySet(), flowRegistryKey);
+
         try {
-          LOG.trace("Storing flowDescriptor with table ID : {} and flow ID : {} for flow hash : {}",
-                    flowDescriptor.getTableKey().getId(), flowDescriptor.getFlowId().getValue(), flowRegistryKey.hashCode());
-          flowRegistry.put(flowRegistryKey, flowDescriptor);
+            if (hasMark(correctFlowRegistryKey)) {
+                // We are probably doing update of flow ID or table ID, so remove mark for removal of this flow
+                // and replace it with new value
+                marks.remove(correctFlowRegistryKey(marks, correctFlowRegistryKey));
+                flowRegistry.forcePut(correctFlowRegistryKey, flowDescriptor);
+                return;
+            }
+
+            LOG.trace("Storing flowDescriptor with table ID : {} and flow ID : {} for flow hash : {}",
+                flowDescriptor.getTableKey().getId(), flowDescriptor.getFlowId().getValue(), correctFlowRegistryKey.hashCode());
+
+            flowRegistry.put(correctFlowRegistryKey, flowDescriptor);
         } catch (IllegalArgumentException ex) {
-          LOG.warn("Flow with flowId {} already exists in table {}", flowDescriptor.getFlowId().getValue(),
-                    flowDescriptor.getTableKey().getId());
-          final FlowId newFlowId = createAlienFlowId(flowDescriptor.getTableKey().getId());
-          final FlowDescriptor newFlowDescriptor = FlowDescriptorFactory.
-            create(flowDescriptor.getTableKey().getId(), newFlowId);
-          flowRegistry.put(flowRegistryKey, newFlowDescriptor);
+            if (hasMark(flowRegistry.inverse().get(flowDescriptor))) {
+                // We are probably doing update of flow, but without changing flow ID or table ID, so we need to replace
+                // old value with new value, but keep the old value marked for removal
+                flowRegistry.forcePut(correctFlowRegistryKey, flowDescriptor);
+                return;
+            }
+
+            // We are trying to store new flow to flow registry, but we already have different flow with same flow ID
+            // stored in registry, so we need to create alien ID for this new flow here.
+            LOG.warn("Flow with flow ID {} already exists in table {}, generating alien flow ID", flowDescriptor.getFlowId().getValue(),
+                flowDescriptor.getTableKey().getId());
+
+            flowRegistry.put(
+                correctFlowRegistryKey,
+                FlowDescriptorFactory.create(
+                    flowDescriptor.getTableKey().getId(),
+                    createAlienFlowId(flowDescriptor.getTableKey().getId())));
         }
     }
 
     @Override
-    public void update(final FlowRegistryKey newFlowRegistryKey, final FlowDescriptor flowDescriptor) {
-        LOG.trace("Updating the entry with hash: {}", newFlowRegistryKey.hashCode());
-        flowRegistry.forcePut(newFlowRegistryKey, flowDescriptor);
+    @GuardedBy("this")
+    public synchronized void forEachEntry(final BiConsumer<FlowRegistryKey, FlowDescriptor> consumer) {
+        flowRegistry.forEach(consumer);
     }
 
     @Override
-    public FlowId storeIfNecessary(final FlowRegistryKey flowRegistryKey) {
-        LOG.trace("Trying to retrieve flow ID for flow hash : {}", flowRegistryKey.hashCode());
-
-        // First, try to get FlowDescriptor from flow registry
-        FlowDescriptor flowDescriptor = retrieveIdForFlow(flowRegistryKey);
-
-        // We was not able to retrieve FlowDescriptor, so we will at least try to generate it
-        if (flowDescriptor == null) {
-            LOG.trace("Flow descriptor for flow hash : {} not found, generating alien flow ID", flowRegistryKey.hashCode());
+    @GuardedBy("this")
+    public synchronized void store(final FlowRegistryKey flowRegistryKey) {
+        if (Objects.isNull(retrieveDescriptor(flowRegistryKey))) {
+            // We do not found flow in flow registry, that means it do not have any ID already assigned, so we need
+            // to generate new alien flow ID here.
+            LOG.debug("Flow descriptor for flow hash : {} not found, generating alien flow ID", flowRegistryKey.hashCode());
             final short tableId = flowRegistryKey.getTableId();
             final FlowId alienFlowId = createAlienFlowId(tableId);
-            flowDescriptor = FlowDescriptorFactory.create(tableId, alienFlowId);
+            final FlowDescriptor flowDescriptor = FlowDescriptorFactory.create(tableId, alienFlowId);
 
             // Finally we got flowDescriptor, so now we will store it to registry,
             // so next time we won't need to generate it again
-            store(flowRegistryKey, flowDescriptor);
+            storeDescriptor(flowRegistryKey, flowDescriptor);
         }
-
-        return flowDescriptor.getFlowId();
     }
 
     @Override
-    public void removeDescriptor(final FlowRegistryKey flowRegistryKey) {
+    @GuardedBy("this")
+    public synchronized void addMark(final FlowRegistryKey flowRegistryKey) {
         LOG.trace("Removing flow descriptor for flow hash : {}", flowRegistryKey.hashCode());
-        flowRegistry.remove(flowRegistryKey);
+        marks.add(flowRegistryKey);
     }
 
     @Override
-    public Map<FlowRegistryKey, FlowDescriptor> getAllFlowDescriptors() {
-        return Collections.unmodifiableMap(flowRegistry);
+    @GuardedBy("this")
+    public synchronized boolean hasMark(final FlowRegistryKey flowRegistryKey) {
+        return Objects.nonNull(flowRegistryKey) && marks.contains(correctFlowRegistryKey(marks, flowRegistryKey));
+
     }
 
     @Override
-    public void close() {
+    @GuardedBy("this")
+    public synchronized void processMarks() {
+        // Remove all flows that was marked for removal from flow registry and clear all marks.
+        marks.forEach(flowRegistry::remove);
+        marks.clear();
+    }
+
+    @Override
+    @GuardedBy("this")
+    public synchronized void forEach(final Consumer<FlowRegistryKey> consumer) {
+        flowRegistry.keySet().forEach(consumer);
+    }
+
+    @Override
+    @GuardedBy("this")
+    public synchronized int size() {
+        return flowRegistry.size();
+    }
+
+    @Override
+    @GuardedBy("this")
+    public synchronized void close() {
         final Iterator<ListenableFuture<List<Optional<FlowCapableNode>>>> iterator = lastFillFutures.iterator();
 
-        while(iterator.hasNext()) {
+        // We need to force interrupt and clear all running futures that are trying to read flow IDs from datastore
+        while (iterator.hasNext()) {
             final ListenableFuture<List<Optional<FlowCapableNode>>> next = iterator.next();
             boolean success = next.cancel(true);
             LOG.trace("Cancelling filling flow registry with flows job {} with result: {}", next, success);
@@ -229,11 +266,44 @@ public class DeviceFlowRegistryImpl implements DeviceFlowRegistry {
         }
 
         flowRegistry.clear();
+        marks.clear();
+    }
+
+    @GuardedBy("this")
+    private FlowRegistryKey correctFlowRegistryKey(final Collection<FlowRegistryKey> flowRegistryKeys, final FlowRegistryKey key) {
+        if (Objects.isNull(key)) {
+            return null;
+        }
+
+        if (!flowRegistryKeys.contains(key)) {
+            // If we failed to compare FlowRegistryKey by hashCode, try to retrieve correct FlowRegistryKey
+            // from set of keys using custom comparator method for Match. This case can occur when we have different
+            // augmentations on extensions, or switch returned things like IP address or port in different format that
+            // we sent.
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Failed to retrieve flow descriptor for flow hash : {}, trying with custom equals method", key.hashCode());
+            }
+
+            for (final FlowRegistryKey flowRegistryKey : flowRegistryKeys) {
+                if (key.equals(flowRegistryKey)) {
+                    return flowRegistryKey;
+                }
+            }
+        }
+
+        // If we failed to find key at all or key is already present in set of keys, just return original key
+        return key;
     }
 
     @VisibleForTesting
     static FlowId createAlienFlowId(final short tableId) {
         final String alienId = ALIEN_SYSTEM_FLOW_ID + tableId + '-' + UNACCOUNTED_FLOWS_COUNTER.incrementAndGet();
+        LOG.debug("Created alien flow id {} for table id {}", alienId, tableId);
         return new FlowId(alienId);
     }
+
+    @VisibleForTesting
+    Map<FlowRegistryKey, FlowDescriptor> getAllFlowDescriptors() {
+        return flowRegistry;
+    }
 }
index 1698929974f1474a0c248be6a816618aa15ded7f..6fb6d5c9ec6ed6f0897725a59b1857b6f9f90567 100644 (file)
@@ -8,49 +8,60 @@
 
 package org.opendaylight.openflowplugin.impl.registry.group;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.function.Consumer;
 import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
 
-/**
- * Created by Martin Bobak &lt;mbobak@cisco.com&gt; on 15.4.2015.
- */
 public class DeviceGroupRegistryImpl implements DeviceGroupRegistry {
 
-    private final List<GroupId> groupIdList = new ArrayList<>();
-    private final List<GroupId> marks = new ArrayList<>();
+    private final List<GroupId> groupIds = Collections.synchronizedList(new ArrayList<>());
+    private final List<GroupId> marks = Collections.synchronizedList(new ArrayList<>());
 
     @Override
     public void store(final GroupId groupId) {
-        groupIdList.add(groupId);
+        groupIds.add(groupId);
     }
 
     @Override
-    public void markToBeremoved(final GroupId groupId) {
+    public void addMark(final GroupId groupId) {
         marks.add(groupId);
     }
 
     @Override
-    public void removeMarked() {
-        synchronized (groupIdList) {
-            groupIdList.removeAll(marks);
-        }
+    public boolean hasMark(final GroupId groupId) {
+        return marks.contains(groupId);
+    }
+
+    @Override
+    public void processMarks() {
+        groupIds.removeAll(marks);
         marks.clear();
     }
 
     @Override
-    public List<GroupId> getAllGroupIds() {
-        return groupIdList;
+    public void forEach(final Consumer<GroupId> consumer) {
+        synchronized (groupIds) {
+            groupIds.forEach(consumer);
+        }
+    }
+
+    @Override
+    public int size() {
+        return groupIds.size();
     }
 
     @Override
     public void close() {
-        synchronized (groupIdList) {
-            groupIdList.clear();
-        }
-        synchronized (marks) {
-            marks.clear();
-        }
+        groupIds.clear();
+        marks.clear();
+    }
+
+    @VisibleForTesting
+    List<GroupId> getAllGroupIds() {
+        return groupIds;
     }
 }
index c5e2192a530ee2bbe7889788e76051596f6c2761..a29dc3cb9d694ac895319625db6178e4017f3e23 100644 (file)
@@ -8,18 +8,18 @@
 
 package org.opendaylight.openflowplugin.impl.registry.meter;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.function.Consumer;
 import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
 
-/**
- * Created by Martin Bobak &lt;mbobak@cisco.com&gt; on 15.4.2015.
- */
 public class DeviceMeterRegistryImpl implements DeviceMeterRegistry {
 
-    private final List<MeterId> meterIds = new ArrayList<>();
-    private final List<MeterId> marks = new ArrayList<>();
+    private final List<MeterId> meterIds = Collections.synchronizedList(new ArrayList<>());
+    private final List<MeterId> marks = Collections.synchronizedList(new ArrayList<>());
 
     @Override
     public void store(final MeterId meterId) {
@@ -27,32 +27,41 @@ public class DeviceMeterRegistryImpl implements DeviceMeterRegistry {
     }
 
     @Override
-    public void markToBeremoved(final MeterId meterId) {
+    public void addMark(final MeterId meterId) {
         marks.add(meterId);
     }
 
     @Override
-    public void removeMarked() {
+    public boolean hasMark(final MeterId meterId) {
+        return marks.contains(meterId);
+    }
+
+    @Override
+    public void processMarks() {
+        meterIds.removeAll(marks);
+        marks.clear();
+    }
+
+    @Override
+    public void forEach(final Consumer<MeterId> consumer) {
         synchronized (meterIds) {
-            meterIds.removeAll(marks);
-        }
-        synchronized (marks) {
-            marks.clear();
+            meterIds.forEach(consumer);
         }
     }
 
     @Override
-    public List<MeterId> getAllMeterIds() {
-        return meterIds;
+    public int size() {
+        return meterIds.size();
     }
 
     @Override
     public void close() {
-        synchronized (meterIds) {
-            meterIds.clear();
-        }
-        synchronized (marks) {
-            marks.clear();
-        }
+        meterIds.clear();
+        marks.clear();
+    }
+
+    @VisibleForTesting
+    List<MeterId> getAllMeterIds() {
+        return meterIds;
     }
 }
index da8d74275dd45956d9b493e303366984211b7320..a516bccc7f72cd1e6f23ce3b8c9183ba4fc73c07 100644 (file)
@@ -112,6 +112,7 @@ final class MultipartRequestOnTheFlyCallback extends AbstractRequestCallback<Lis
             StatisticsGatheringUtils.writeFlowStatistics(allMultipartData, deviceInfo, registry, txFacade);
             if (!multipartReply.getFlags().isOFPMPFREQMORE()) {
                 endCollecting();
+                registry.processMarks();
             }
         }
     }
index 5a4ed71f75fa31bd683ccf159873f79946e18a8a..a18385e7b3ce4062c59122da622e68c01fec6c2c 100644 (file)
@@ -144,10 +144,10 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
                 if (Objects.nonNull(input.getFlowRef())) {
                     final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class, FlowKey.class).getId();
                     flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
-                    deviceContext.getDeviceFlowRegistry().store(flowRegistryKey, flowDescriptor);
+                    deviceContext.getDeviceFlowRegistry().storeDescriptor(flowRegistryKey, flowDescriptor);
                 } else {
-                    final FlowId flowId = deviceContext.getDeviceFlowRegistry().storeIfNecessary(flowRegistryKey);
-                    flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
+                    deviceContext.getDeviceFlowRegistry().store(flowRegistryKey);
+                    flowDescriptor = deviceContext.getDeviceFlowRegistry().retrieveDescriptor(flowRegistryKey);
                 }
 
                 if (LOG.isDebugEnabled()) {
@@ -188,11 +188,11 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
                     LOG.debug("Flow remove finished without error for flow={}", input);
                 }
                 FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
-                deviceContext.getDeviceFlowRegistry().removeDescriptor(flowRegistryKey);
+                deviceContext.getDeviceFlowRegistry().addMark(flowRegistryKey);
 
                 if (itemLifecycleListener != null) {
-                    final FlowDescriptor flowDescriptor =
-                            deviceContext.getDeviceFlowRegistry().retrieveIdForFlow(flowRegistryKey);
+                    final FlowDescriptor flowDescriptor = deviceContext.getDeviceFlowRegistry().retrieveDescriptor(flowRegistryKey);
+
                     if (flowDescriptor != null) {
                         KeyedInstanceIdentifier<Flow, FlowKey> flowPath = createFlowPath(flowDescriptor,
                                 deviceContext.getDeviceInfo().getNodeInstanceIdentifier());
@@ -228,16 +228,25 @@ public class SalFlowServiceImpl implements SalFlowService, ItemLifeCycleSource {
             final OriginalFlow original = input.getOriginalFlow();
             final FlowRegistryKey origFlowRegistryKey = FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), original);
             final FlowRegistryKey updatedFlowRegistryKey = FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), updated);
-            final FlowDescriptor origFlowDescriptor = deviceFlowRegistry.retrieveIdForFlow(origFlowRegistryKey);
+            final FlowDescriptor origFlowDescriptor = deviceFlowRegistry.retrieveDescriptor(origFlowRegistryKey);
 
             final boolean isUpdate = Objects.nonNull(origFlowDescriptor);
-            final FlowId fLowId = Objects.nonNull(input.getFlowRef())
-                    ? input.getFlowRef().getValue().firstKeyOf(Flow.class).getId()
-                    : isUpdate ? origFlowDescriptor.getFlowId() : deviceFlowRegistry.storeIfNecessary(updatedFlowRegistryKey);
-            final FlowDescriptor updatedFlowDescriptor = FlowDescriptorFactory.create(updated.getTableId(), fLowId);
+            final FlowDescriptor updatedFlowDescriptor;
+
+            if (Objects.nonNull(input.getFlowRef())) {
+               updatedFlowDescriptor = FlowDescriptorFactory.create(updated.getTableId(), input.getFlowRef().getValue().firstKeyOf(Flow.class).getId());
+            } else {
+                if (isUpdate) {
+                    updatedFlowDescriptor = origFlowDescriptor;
+                } else {
+                    deviceFlowRegistry.store(updatedFlowRegistryKey);
+                    updatedFlowDescriptor = deviceFlowRegistry.retrieveDescriptor(updatedFlowRegistryKey);
+                }
+            }
+
             if (isUpdate) {
-                deviceFlowRegistry.removeDescriptor(origFlowRegistryKey);
-                deviceFlowRegistry.store(updatedFlowRegistryKey, updatedFlowDescriptor);
+                deviceFlowRegistry.addMark(origFlowRegistryKey);
+                deviceFlowRegistry.storeDescriptor(updatedFlowRegistryKey, updatedFlowDescriptor);
             }
 
             if (itemLifecycleListener != null) {
index a6264a1ba704c68f552197d76712a3970a092121..3bbe45aeefbbd58a75c28ffe426c19a5c7b1da30 100644 (file)
@@ -126,7 +126,7 @@ public class SalGroupServiceImpl implements SalGroupService, ItemLifeCycleSource
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Group remove with id={} finished without error", input.getGroupId().getValue());
                     }
-                    removeGroup.getDeviceRegistry().getDeviceGroupRegistry().markToBeremoved(input.getGroupId());
+                    removeGroup.getDeviceRegistry().getDeviceGroupRegistry().addMark(input.getGroupId());
                     removeIfNecessaryFromDS(input.getGroupId());
                 } else {
                     if (LOG.isDebugEnabled()) {
index 02f72dc6ab8195da6c55f43f32682d7580ca5960..1a4ac4c3dbd682b9a3b260f5e92ebd204e21a888 100644 (file)
@@ -119,7 +119,6 @@ public class SalMeterServiceImpl implements SalMeterService, ItemLifeCycleSource
 
     @Override
     public Future<RpcResult<RemoveMeterOutput>> removeMeter(final RemoveMeterInput input) {
-        removeMeter.getDeviceRegistry().getDeviceMeterRegistry().markToBeremoved(input.getMeterId());
         final ListenableFuture<RpcResult<RemoveMeterOutput>> resultFuture = removeMeter.handleServiceCall(input);
         Futures.addCallback(resultFuture, new FutureCallback<RpcResult<RemoveMeterOutput>>() {
             @Override
@@ -128,6 +127,7 @@ public class SalMeterServiceImpl implements SalMeterService, ItemLifeCycleSource
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Meter remove with id={} finished without error", input.getMeterId());
                     }
+                    removeMeter.getDeviceRegistry().getDeviceMeterRegistry().addMark(input.getMeterId());
                     removeIfNecessaryFromDS(input.getMeterId());
                 } else {
                     if (LOG.isDebugEnabled()) {
@@ -164,4 +164,4 @@ public class SalMeterServiceImpl implements SalMeterService, ItemLifeCycleSource
     private static KeyedInstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter, MeterKey> createMeterPath(final MeterId meterId, final KeyedInstanceIdentifier<Node, NodeKey> nodePath) {
         return nodePath.augmentation(FlowCapableNode.class).child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter.class, new MeterKey(meterId));
     }
-}
\ No newline at end of file
+}
index bb789b62712adcb86e0667410cb2bfd40a68dcac..966608079424ad29c84e875d2eda890e1a989e8c 100644 (file)
@@ -321,6 +321,7 @@ public final class StatisticsGatheringUtils {
             writeFlowStatistics(data, deviceInfo, flowRegistry, txFacade);
             txFacade.submitTransaction();
             EventsTimeCounter.markEnd(eventIdentifier);
+            flowRegistry.processMarks();
             return Boolean.TRUE;
         });
     }
@@ -339,7 +340,8 @@ public final class StatisticsGatheringUtils {
 
                     final short tableId = flowStat.getTableId();
                     final FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(deviceInfo.getVersion(), flowBuilder.build());
-                    final FlowId flowId = registry.storeIfNecessary(flowRegistryKey);
+                    registry.store(flowRegistryKey);
+                    final FlowId flowId = registry.retrieveDescriptor(flowRegistryKey).getFlowId();
 
                     final FlowKey flowKey = new FlowKey(flowId);
                     flowBuilder.setKey(flowKey);
@@ -522,11 +524,11 @@ public final class StatisticsGatheringUtils {
             final DeviceMeterRegistry meterRegistry,
             final InstanceIdentifier<FlowCapableNode> flowNodeIdent,
             final TxFacade txFacade) throws TransactionChainClosedException {
-        for (final MeterId meterId : meterRegistry.getAllMeterIds()) {
+        meterRegistry.forEach(meterId -> {
             final InstanceIdentifier<Meter> meterIdent = flowNodeIdent.child(Meter.class, new MeterKey(meterId));
             txFacade.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, meterIdent);
-        }
-        meterRegistry.removeMarked();
+        });
+        meterRegistry.processMarks();
     }
 
     private static void processGroupDescStats(
@@ -558,11 +560,11 @@ public final class StatisticsGatheringUtils {
             final TxFacade txFacade,
             final InstanceIdentifier<FlowCapableNode> flowNodeIdent,
             final DeviceGroupRegistry groupRegistry) throws TransactionChainClosedException {
-        for (final GroupId groupId : groupRegistry.getAllGroupIds()) {
+        groupRegistry.forEach(groupId -> {
             final InstanceIdentifier<Group> groupIdent = flowNodeIdent.child(Group.class, new GroupKey(groupId));
             txFacade.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, groupIdent);
-        }
-        groupRegistry.removeMarked();
+        });
+        groupRegistry.processMarks();
     }
 
     private static void processGroupStatistics(
index a106ad93e57f9a16f74e1d6a0b7058c2863e27b2..e6d6181fb64db1bfe59d542981b3e383928dc193 100644 (file)
@@ -20,7 +20,6 @@ import org.opendaylight.openflowplugin.extension.api.path.MatchPath;
 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.data.FlowStatsResponseConvertorData;
-import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.data.VersionDatapathIdConvertorData;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.match.MatchReactor;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsOutput;
@@ -168,6 +167,7 @@ public class FlowDirectStatisticsService extends AbstractDirectStatisticsService
                 .addAugmentation(FlowStatisticsData.class, flowStatisticsDataBld.build());
 
         final FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(getVersion(), flowBuilder.build());
-        return getDeviceRegistry().getDeviceFlowRegistry().storeIfNecessary(flowRegistryKey);
+        getDeviceRegistry().getDeviceFlowRegistry().store(flowRegistryKey);
+        return getDeviceRegistry().getDeviceFlowRegistry().retrieveDescriptor(flowRegistryKey).getFlowId();
     }
 }
index e5a3167bc5ab581e4ad722f752e958c669d2525c..95e454a98001f6ade7afd1aa346a28c2363d84a5 100644 (file)
@@ -478,7 +478,7 @@ public class DeviceContextImplTest {
         // insert flow+flowId into local registry
         final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(deviceInfo.getVersion(), flowRemovedMdsalBld.build());
         final FlowDescriptor flowDescriptor = FlowDescriptorFactory.create((short) 0, new FlowId("ut-ofp:f456"));
-        deviceContext.getDeviceFlowRegistry().store(flowRegKey, flowDescriptor);
+        deviceContext.getDeviceFlowRegistry().storeDescriptor(flowRegKey, flowDescriptor);
 
         // plug in lifecycleListener
         final ItemLifecycleListener itemLifecycleListener = Mockito.mock(ItemLifecycleListener.class);
@@ -531,9 +531,9 @@ public class DeviceContextImplTest {
     public void testOnDeviceDisconnected() throws Exception {
         final DeviceTerminationPhaseHandler deviceContextClosedHandler = mock(DeviceTerminationPhaseHandler.class);
 
-        assertEquals(0, deviceContext.getDeviceFlowRegistry().getAllFlowDescriptors().size());
-        assertEquals(0, deviceContext.getDeviceGroupRegistry().getAllGroupIds().size());
-        assertEquals(0, deviceContext.getDeviceMeterRegistry().getAllMeterIds().size());
+        assertEquals(0, deviceContext.getDeviceFlowRegistry().size());
+        assertEquals(0, deviceContext.getDeviceGroupRegistry().size());
+        assertEquals(0, deviceContext.getDeviceMeterRegistry().size());
 
     }
 
index 250d13615c08833ec81a3b51801357d4ef807639..d4f5906426ca986ed02b9246a3a46e1e26d2395d 100644 (file)
@@ -80,7 +80,7 @@ public class DeviceFlowRegistryImplTest {
         descriptor = FlowDescriptorFactory.create(key.getTableId(), new FlowId("ut:1"));
 
         Assert.assertEquals(0, deviceFlowRegistry.getAllFlowDescriptors().size());
-        deviceFlowRegistry.store(key, descriptor);
+        deviceFlowRegistry.storeDescriptor(key, descriptor);
         Assert.assertEquals(1, deviceFlowRegistry.getAllFlowDescriptors().size());
     }
 
@@ -113,7 +113,7 @@ public class DeviceFlowRegistryImplTest {
         order.verify(readOnlyTransaction).read(LogicalDatastoreType.OPERATIONAL, path);
         assertTrue(allFlowDescriptors.containsKey(key));
 
-        deviceFlowRegistry.removeDescriptor(key);
+        deviceFlowRegistry.addMark(key);
     }
 
     @Test
@@ -166,23 +166,23 @@ public class DeviceFlowRegistryImplTest {
 
     @Test
     public void testRetrieveIdForFlow() throws Exception {
-        Assert.assertEquals(descriptor, deviceFlowRegistry.retrieveIdForFlow(key));
+        Assert.assertEquals(descriptor, deviceFlowRegistry.retrieveDescriptor(key));
     }
 
     @Test
     public void testStore() throws Exception {
         //store the same key with different value
         final FlowDescriptor descriptor2 = FlowDescriptorFactory.create(key.getTableId(), new FlowId("ut:2"));
-        deviceFlowRegistry.store(key, descriptor2);
+        deviceFlowRegistry.storeDescriptor(key, descriptor2);
         Assert.assertEquals(1, deviceFlowRegistry.getAllFlowDescriptors().size());
-        Assert.assertEquals("ut:2", deviceFlowRegistry.retrieveIdForFlow(key).getFlowId().getValue());
+        Assert.assertEquals("ut:2", deviceFlowRegistry.retrieveDescriptor(key).getFlowId().getValue());
 
         // store new key with old value
         final FlowAndStatisticsMapList flowStats = TestFlowHelper.createFlowAndStatisticsMapListBuilder(2).build();
         final FlowRegistryKey key2 = FlowRegistryKeyFactory.create(OFConstants.OFP_VERSION_1_3, flowStats);
-        deviceFlowRegistry.store(key2, descriptor);
+        deviceFlowRegistry.storeDescriptor(key2, descriptor);
         Assert.assertEquals(2, deviceFlowRegistry.getAllFlowDescriptors().size());
-        Assert.assertEquals("ut:1", deviceFlowRegistry.retrieveIdForFlow(key2).getFlowId().getValue());
+        Assert.assertEquals("ut:1", deviceFlowRegistry.retrieveDescriptor(key2).getFlowId().getValue());
     }
 
     @Test
@@ -190,26 +190,28 @@ public class DeviceFlowRegistryImplTest {
         FlowId newFlowId;
 
         //store existing key
-        newFlowId = deviceFlowRegistry.storeIfNecessary(key);
+        deviceFlowRegistry.store(key);
+        newFlowId = deviceFlowRegistry.retrieveDescriptor(key).getFlowId();
 
         Assert.assertEquals(1, deviceFlowRegistry.getAllFlowDescriptors().size());
-        Assert.assertEquals(descriptor, deviceFlowRegistry.retrieveIdForFlow(key));
+        Assert.assertEquals(descriptor, deviceFlowRegistry.retrieveDescriptor(key));
         Assert.assertEquals(descriptor.getFlowId(), newFlowId);
 
         //store new key
         final String alienPrefix = "#UF$TABLE*2-";
         final FlowRegistryKey key2 = FlowRegistryKeyFactory.create(OFConstants.OFP_VERSION_1_3, TestFlowHelper.createFlowAndStatisticsMapListBuilder(2).build());
-        newFlowId = deviceFlowRegistry.storeIfNecessary(key2);
+        deviceFlowRegistry.store(key2);
+        newFlowId = deviceFlowRegistry.retrieveDescriptor(key2).getFlowId();
 
         Assert.assertTrue(newFlowId.getValue().startsWith(alienPrefix));
-        Assert.assertTrue(deviceFlowRegistry.retrieveIdForFlow(key2).getFlowId().getValue().startsWith(alienPrefix));
+        Assert.assertTrue(deviceFlowRegistry.retrieveDescriptor(key2).getFlowId().getValue().startsWith(alienPrefix));
         Assert.assertEquals(2, deviceFlowRegistry.getAllFlowDescriptors().size());
     }
 
     @Test
     public void testRemoveDescriptor() throws Exception {
-        deviceFlowRegistry.removeDescriptor(key);
-        Assert.assertEquals(0, deviceFlowRegistry.getAllFlowDescriptors().size());
+        deviceFlowRegistry.addMark(key);
+        Assert.assertEquals(1, deviceFlowRegistry.getAllFlowDescriptors().size());
     }
 
     @Test
index a0d74b049936b3d8458c2ac7d26436c701523442..6b62e9f16938bb320bf64f5b166dcaecd4aa7cc3 100644 (file)
@@ -40,29 +40,29 @@ public class DeviceGroupRegistryImplTest {
 
     @Test
     public void testRemoveMarked() throws Exception {
-        deviceGroupRegistry.markToBeremoved(groupId);
-        deviceGroupRegistry.removeMarked();
+        deviceGroupRegistry.addMark(groupId);
+        deviceGroupRegistry.processMarks();
         Assert.assertEquals(0, deviceGroupRegistry.getAllGroupIds().size());
     }
 
     @Test
     public void testRemoveMarkedNegative() throws Exception {
-        deviceGroupRegistry.markToBeremoved(groupId2);
-        deviceGroupRegistry.removeMarked();
+        deviceGroupRegistry.addMark(groupId2);
+        deviceGroupRegistry.processMarks();
         Assert.assertEquals(1, deviceGroupRegistry.getAllGroupIds().size());
     }
 
 
     @Test
     public void testClose() throws Exception {
-        deviceGroupRegistry.markToBeremoved(groupId);
+        deviceGroupRegistry.addMark(groupId);
         deviceGroupRegistry.close();
 
         Assert.assertEquals(0, deviceGroupRegistry.getAllGroupIds().size());
         deviceGroupRegistry.store(groupId);
         Assert.assertEquals(1, deviceGroupRegistry.getAllGroupIds().size());
-        deviceGroupRegistry.removeMarked();
+        deviceGroupRegistry.processMarks();
         Assert.assertEquals(1, deviceGroupRegistry.getAllGroupIds().size());
 
     }
-}
\ No newline at end of file
+}
index 08bd755c85c657a9080213fd7ad8aa8d038e2411..dbc3e3aedc85c7e3e6cbc568bcd6747cf060a133 100644 (file)
@@ -40,29 +40,29 @@ public class DeviceMeterRegistryImplTest {
 
     @Test
     public void testRemoveMarked() throws Exception {
-        deviceMeterRegistry.markToBeremoved(meterId);
-        deviceMeterRegistry.removeMarked();
+        deviceMeterRegistry.addMark(meterId);
+        deviceMeterRegistry.processMarks();
         Assert.assertEquals(0, deviceMeterRegistry.getAllMeterIds().size());
     }
 
     @Test
     public void testRemoveMarkedNegative() throws Exception {
-        deviceMeterRegistry.markToBeremoved(meterId2);
-        deviceMeterRegistry.removeMarked();
+        deviceMeterRegistry.addMark(meterId2);
+        deviceMeterRegistry.processMarks();
         Assert.assertEquals(1, deviceMeterRegistry.getAllMeterIds().size());
     }
 
 
     @Test
     public void testClose() throws Exception {
-        deviceMeterRegistry.markToBeremoved(meterId);
+        deviceMeterRegistry.addMark(meterId);
         deviceMeterRegistry.close();
 
         Assert.assertEquals(0, deviceMeterRegistry.getAllMeterIds().size());
         deviceMeterRegistry.store(meterId);
         Assert.assertEquals(1, deviceMeterRegistry.getAllMeterIds().size());
-        deviceMeterRegistry.removeMarked();
+        deviceMeterRegistry.processMarks();
         Assert.assertEquals(1, deviceMeterRegistry.getAllMeterIds().size());
 
     }
-}
\ No newline at end of file
+}
index 5d15eff5cd7e86192ff42eeafd5da300ab46c6b5..692c29feaa1d3600b4d354b47d7633d8cdf62c92 100644 (file)
@@ -9,7 +9,6 @@ package org.opendaylight.openflowplugin.impl.services;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -138,7 +137,7 @@ public class MultipartRequestOnTheFlyCallbackTest {
         when(mockedDeviceContext.getDeviceState()).thenReturn(mockedDeviceState);
         when(mockedDeviceContext.getDeviceInfo()).thenReturn(mockedDeviceInfo);
         when(mockedDeviceContext.getDeviceFlowRegistry()).thenReturn(mockedFlowRegistry);
-        when(mockedFlowRegistry.retrieveIdForFlow(Matchers.any(FlowRegistryKey.class))).thenReturn(mockedFlowDescriptor);
+        when(mockedFlowRegistry.retrieveDescriptor(Matchers.any(FlowRegistryKey.class))).thenReturn(mockedFlowDescriptor);
 
         final InstanceIdentifier<FlowCapableNode> nodePath = mockedDeviceInfo.getNodeInstanceIdentifier().augmentation(FlowCapableNode.class);
         final FlowCapableNodeBuilder flowNodeBuilder = new FlowCapableNodeBuilder();
index 89cbef6d7e49435b9e08035672bddc5fb532c03b..8418e13ac8837cebd0b9b8faa6db7d1daf7b09a6 100644 (file)
@@ -285,8 +285,7 @@ public class SalFlowServiceImplTest extends TestCase {
         when(mockedFlowDescriptor.getFlowId()).thenReturn(flowId);
         when(mockedFlowDescriptor.getTableKey()).thenReturn(new TableKey(DUMMY_TABLE_ID));
 
-        when(deviceFlowRegistry.storeIfNecessary(Matchers.any(FlowRegistryKey.class))).thenReturn(flowId);
-        when(deviceFlowRegistry.retrieveIdForFlow(Matchers.any(FlowRegistryKey.class))).thenReturn(mockedFlowDescriptor);
+        when(deviceFlowRegistry.retrieveDescriptor(Matchers.any(FlowRegistryKey.class))).thenReturn(mockedFlowDescriptor);
     }
 
     private <T extends DataObject> void verifyOutput(Future<RpcResult<T>> rpcResultFuture) throws ExecutionException, InterruptedException {
index f0814d8962320ef7bf1e89ecf42033030a9fcebe..bca07192fb93a9e1ec69ad4103b334a745b90997 100644 (file)
@@ -129,7 +129,7 @@ public class SalGroupServiceImplTest extends ServiceMocking {
 
         salGroupService.removeGroup(removeGroupInput);
         verify(mockedRequestContextStack).createRequestContext();
-        verify(mockedDeviceGroupRegistry).markToBeremoved(eq(dummyGroupId));
+        verify(mockedDeviceGroupRegistry).addMark(eq(dummyGroupId));
 
         if (itemLifecycleListener != null) {
             verify(itemLifecycleListener).onRemoved(Matchers.<KeyedInstanceIdentifier<Group, GroupKey>>any());
index ea20a88647c778e0c1aeb0cdf39a7ba47a050353..0cb47df0261c7aae5871803f15d07fb30a8d2558 100644 (file)
@@ -130,7 +130,7 @@ public class SalMeterServiceImplTest extends ServiceMocking {
 
         salMeterService.removeMeter(removeMeterInput);
         verify(mockedRequestContextStack).createRequestContext();
-        verify(mockedDeviceMeterRegistry).markToBeremoved(eq(dummyMeterId));
+        verify(mockedDeviceMeterRegistry).addMark(eq(dummyMeterId));
 
         if (itemLifecycleListener != null) {
             verify(itemLifecycleListener).onRemoved(Matchers.<KeyedInstanceIdentifier<Meter, MeterKey>>any());
index 607adc0627f987e3e48bf13263afb612886915a5..3d6add0f1f51550fd1398984f149b12a432a5321 100644 (file)
@@ -180,7 +180,7 @@ public class StatisticsGatheringUtilsTest {
         when(deviceContext.getDeviceFlowRegistry()).thenReturn(deviceFlowRegistry);
         when(deviceContext.getDeviceGroupRegistry()).thenReturn(deviceGroupRegistry);
         when(deviceContext.getDeviceMeterRegistry()).thenReturn(deviceMeterRegistry);
-        when(deviceFlowRegistry.retrieveIdForFlow(Matchers.any(FlowRegistryKey.class))).thenReturn(flowDescriptor);
+        when(deviceFlowRegistry.retrieveDescriptor(Matchers.any(FlowRegistryKey.class))).thenReturn(flowDescriptor);
         when(deviceContext.getReadTransaction()).thenReturn(readTx);
         when(txFacade.getReadTransaction()).thenReturn(readTx);
         when(deviceContext.getPrimaryConnectionContext()).thenReturn(connectionAdapter);
@@ -284,7 +284,7 @@ public class StatisticsGatheringUtilsTest {
 
         verify(deviceContext, Mockito.never()).addDeleteToTxChain(
                 Matchers.eq(LogicalDatastoreType.OPERATIONAL), Matchers.<InstanceIdentifier<?>>any());
-        verify(deviceGroupRegistry).removeMarked();
+        verify(deviceGroupRegistry).processMarks();
         verify(deviceGroupRegistry).store(storedGroupId);
         verify(txFacade).writeToTransaction(
                 Matchers.eq(LogicalDatastoreType.OPERATIONAL), Matchers.eq(groupPath), Matchers.any(Group.class));
index 9dd7d6aaf954745c388a1e2f5a855a6988f8d044..e2a1869af9c18dd642c78b713b0b6177c38c2d77 100644 (file)
@@ -22,6 +22,8 @@ import java.util.Collections;
 import java.util.List;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
+import org.opendaylight.openflowplugin.impl.registry.flow.FlowDescriptorFactory;
+import org.opendaylight.openflowplugin.impl.statistics.services.direct.AbstractDirectStatisticsServiceTest;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
@@ -45,7 +47,7 @@ public class FlowDirectStatisticsServiceTest extends AbstractDirectStatisticsSer
     public void setUp() throws Exception {
         service = new FlowDirectStatisticsService(requestContextStack, deviceContext, convertorManager);
         final DeviceFlowRegistry registry = mock(DeviceFlowRegistry.class);
-        when(registry.storeIfNecessary(any())).thenReturn(new FlowId("1"));
+        when(registry.retrieveDescriptor(any())).thenReturn(FlowDescriptorFactory.create(TABLE_NO, new FlowId("1")));
         when(deviceContext.getDeviceFlowRegistry()).thenReturn(registry);
     }
 
@@ -107,4 +109,4 @@ public class FlowDirectStatisticsServiceTest extends AbstractDirectStatisticsSer
         service.storeStatistics(output);
         verify(deviceContext).writeToTransactionWithParentsSlow(eq(LogicalDatastoreType.OPERATIONAL), any(), any());
     }
-}
\ No newline at end of file
+}