--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.api.openflow.registry;
+
+import java.util.function.Consumer;
+
+public interface CommonDeviceRegistry<KEY> extends AutoCloseable {
+
+ /**
+ * Store KEY in device registry.
+ * @param key device registry key
+ */
+ void store(KEY key);
+
+ /**
+ * Add mark for specified KEY.
+ * @param key device registry key
+ */
+ void addMark(KEY key);
+
+ /**
+ * Checks if registry has mark for KEY.
+ * @param key device registry key
+ * @return true if device registry has mark for KEY
+ */
+ boolean hasMark(KEY key);
+
+ /**
+ * Process marked keys.
+ */
+ void processMarks();
+
+ /**
+ * Iterate over all keys in device registry.
+ * @param consumer key consumer
+ */
+ void forEach(Consumer<KEY> consumer);
+
+ /**
+ * Get device registry size.
+ * @return device registry size
+ */
+ int size();
+
+ @Override
+ void close();
+
+}
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
-import java.util.Map;
+import java.util.function.BiConsumer;
+import org.opendaylight.openflowplugin.api.openflow.registry.CommonDeviceRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
/**
* Registry for mapping composite-key of flow ({@link FlowRegistryKey}) from device view
* to flow descriptor ({@link FlowDescriptor}) as the identifier of the same flow in data store.
*/
-public interface DeviceFlowRegistry extends AutoCloseable {
+public interface DeviceFlowRegistry extends CommonDeviceRegistry<FlowRegistryKey> {
ListenableFuture<List<Optional<FlowCapableNode>>> fill();
- FlowDescriptor retrieveIdForFlow(FlowRegistryKey flowRegistryKey);
+ void storeDescriptor(FlowRegistryKey flowRegistryKey, FlowDescriptor flowDescriptor);
- void store(FlowRegistryKey flowRegistryKey, FlowDescriptor flowDescriptor);
+ FlowDescriptor retrieveDescriptor(FlowRegistryKey flowRegistryKey);
- FlowId storeIfNecessary(FlowRegistryKey flowRegistryKey);
+ void forEachEntry(BiConsumer<FlowRegistryKey, FlowDescriptor> consumer);
- void removeDescriptor(FlowRegistryKey flowRegistryKey);
-
- void update(FlowRegistryKey newFlowRegistryKey,FlowDescriptor flowDescriptor);
-
- Map<FlowRegistryKey, FlowDescriptor> getAllFlowDescriptors();
-
- @Override
- void close();
-}
\ No newline at end of file
+}
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
-/**
- * Created by Martin Bobak <mbobak@cisco.com> on 9.4.2015.
- */
public interface FlowDescriptor {
FlowId getFlowId();
TableKey getTableKey();
+
}
package org.opendaylight.openflowplugin.api.openflow.registry.group;
-import java.util.List;
+import org.opendaylight.openflowplugin.api.openflow.registry.CommonDeviceRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
-/**
- * Created by Martin Bobak <mbobak@cisco.com> on 15.4.2015.
- */
-public interface DeviceGroupRegistry extends AutoCloseable {
-
- void store(GroupId groupId);
-
- void markToBeremoved(GroupId groupId);
-
- void removeMarked();
-
- List<GroupId> getAllGroupIds();
+public interface DeviceGroupRegistry extends CommonDeviceRegistry<GroupId> {
- @Override
- void close();
}
package org.opendaylight.openflowplugin.api.openflow.registry.meter;
-import java.util.List;
+import org.opendaylight.openflowplugin.api.openflow.registry.CommonDeviceRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
-/**
- * Created by Martin Bobak <mbobak@cisco.com> on 15.4.2015.
- */
-public interface DeviceMeterRegistry extends AutoCloseable {
-
- void store(MeterId meterId);
-
- void markToBeremoved(MeterId meterId);
-
- void removeMarked();
-
- List<MeterId> getAllMeterIds();
-
- @Override
- void close();
+public interface DeviceMeterRegistry extends CommonDeviceRegistry<MeterId> {
}
//2. create registry key
final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(getDeviceInfo().getVersion(), flowRemovedNotification);
//3. lookup flowId
- final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveIdForFlow(flowRegKey);
+ final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveDescriptor(flowRegKey);
//4. if flowId present:
if (flowDescriptor != null) {
// a) construct flow path
import com.google.common.base.Optional;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
-import com.google.common.collect.Maps;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@ThreadSafe
public class DeviceFlowRegistryImpl implements DeviceFlowRegistry {
private static final Logger LOG = LoggerFactory.getLogger(DeviceFlowRegistryImpl.class);
private static final String ALIEN_SYSTEM_FLOW_ID = "#UF$TABLE*";
private static final AtomicInteger UNACCOUNTED_FLOWS_COUNTER = new AtomicInteger(0);
- private final BiMap<FlowRegistryKey, FlowDescriptor> flowRegistry = Maps.synchronizedBiMap(HashBiMap.create());
+ private final BiMap<FlowRegistryKey, FlowDescriptor> flowRegistry = HashBiMap.create();
+ private final List<FlowRegistryKey> marks = new ArrayList<>();
private final DataBroker dataBroker;
private final KeyedInstanceIdentifier<Node, NodeKey> instanceIdentifier;
private final List<ListenableFuture<List<Optional<FlowCapableNode>>>> lastFillFutures = new ArrayList<>();
if (!flowRegistry.containsKey(key)) {
LOG.trace("Found flow with table ID : {} and flow ID : {}", flow.getTableId(), flow.getId().getValue());
final FlowDescriptor descriptor = FlowDescriptorFactory.create(flow.getTableId(), flow.getId());
- store(key, descriptor);
+ storeDescriptor(key, descriptor);
}
};
}
@Override
- public ListenableFuture<List<Optional<FlowCapableNode>>> fill() {
+ @GuardedBy("this")
+ public synchronized ListenableFuture<List<Optional<FlowCapableNode>>> fill() {
LOG.debug("Filling flow registry with flows for node: {}", instanceIdentifier.getKey().getId().getValue());
// Prepare path for read transaction
final InstanceIdentifier<FlowCapableNode> path = instanceIdentifier.augmentation(FlowCapableNode.class);
// First, try to fill registry with flows from DS/Configuration
- CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> configFuture = fillFromDatastore(LogicalDatastoreType.CONFIGURATION, path);
+ final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> configFuture = fillFromDatastore(LogicalDatastoreType.CONFIGURATION, path);
// Now, try to fill registry with flows from DS/Operational
// in case of cluster fail over, when clients are not using DS/Configuration
// for adding flows, but only RPCs
- CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> operationalFuture = fillFromDatastore(LogicalDatastoreType.OPERATIONAL, path);
+ final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> operationalFuture = fillFromDatastore(LogicalDatastoreType.OPERATIONAL, path);
// And at last, chain and return futures created above.
// Also, cache this future, so call to DeviceFlowRegistry.close() will be able
return lastFillFuture;
}
+ @GuardedBy("this")
private CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> fillFromDatastore(final LogicalDatastoreType logicalDatastoreType, final InstanceIdentifier<FlowCapableNode> path) {
// Create new read-only transaction
final ReadOnlyTransaction transaction = dataBroker.newReadOnlyTransaction();
// Bail out early if transaction is null
if (transaction == null) {
return Futures.immediateFailedCheckedFuture(
- new ReadFailedException("Read transaction is null"));
+ new ReadFailedException("Read transaction is null"));
}
// Prepare read operation from datastore for path
final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> future =
- transaction.read(logicalDatastoreType, path);
+ transaction.read(logicalDatastoreType, path);
// Bail out early if future is null
if (future == null) {
return Futures.immediateFailedCheckedFuture(
- new ReadFailedException("Future from read transaction is null"));
+ new ReadFailedException("Future from read transaction is null"));
}
Futures.addCallback(future, new FutureCallback<Optional<FlowCapableNode>>() {
@Override
public void onSuccess(Optional<FlowCapableNode> result) {
result.asSet().stream()
- .filter(Objects::nonNull)
- .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
- .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
- .filter(Objects::nonNull)
- .filter(table -> Objects.nonNull(table.getFlow()))
- .flatMap(table -> table.getFlow().stream())
- .filter(Objects::nonNull)
- .filter(flow -> Objects.nonNull(flow.getId()))
- .forEach(flowConsumer);
+ .filter(Objects::nonNull)
+ .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
+ .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
+ .filter(Objects::nonNull)
+ .filter(table -> Objects.nonNull(table.getFlow()))
+ .flatMap(table -> table.getFlow().stream())
+ .filter(Objects::nonNull)
+ .filter(flow -> Objects.nonNull(flow.getId()))
+ .forEach(flowConsumer);
// After we are done with reading from datastore, close the transaction
transaction.close();
}
@Override
- public FlowDescriptor retrieveIdForFlow(final FlowRegistryKey flowRegistryKey) {
+ @GuardedBy("this")
+ public synchronized FlowDescriptor retrieveDescriptor(final FlowRegistryKey flowRegistryKey) {
LOG.trace("Retrieving flow descriptor for flow hash : {}", flowRegistryKey.hashCode());
- FlowDescriptor flowDescriptor = flowRegistry.get(flowRegistryKey);
- // Get FlowDescriptor from flow registry
- if(flowDescriptor == null){
- if (LOG.isTraceEnabled()) {
- LOG.trace("Failed to retrieve flow descriptor for flow hash : {}, trying with custom equals method", flowRegistryKey.hashCode());
- }
- for(Map.Entry<FlowRegistryKey, FlowDescriptor> fd : flowRegistry.entrySet()) {
- if (flowRegistryKey.equals(fd.getKey())) {
- flowDescriptor = fd.getValue();
- break;
- }
- }
- }
- return flowDescriptor;
+ return flowRegistry.get(correctFlowRegistryKey(flowRegistry.keySet(), flowRegistryKey));
}
@Override
- public void store(final FlowRegistryKey flowRegistryKey, final FlowDescriptor flowDescriptor) {
+ @GuardedBy("this")
+ public synchronized void storeDescriptor(final FlowRegistryKey flowRegistryKey, final FlowDescriptor flowDescriptor) {
+ final FlowRegistryKey correctFlowRegistryKey = correctFlowRegistryKey(flowRegistry.keySet(), flowRegistryKey);
+
try {
- LOG.trace("Storing flowDescriptor with table ID : {} and flow ID : {} for flow hash : {}",
- flowDescriptor.getTableKey().getId(), flowDescriptor.getFlowId().getValue(), flowRegistryKey.hashCode());
- flowRegistry.put(flowRegistryKey, flowDescriptor);
+ if (hasMark(correctFlowRegistryKey)) {
+ // We are probably doing update of flow ID or table ID, so remove mark for removal of this flow
+ // and replace it with new value
+ marks.remove(correctFlowRegistryKey(marks, correctFlowRegistryKey));
+ flowRegistry.forcePut(correctFlowRegistryKey, flowDescriptor);
+ return;
+ }
+
+ LOG.trace("Storing flowDescriptor with table ID : {} and flow ID : {} for flow hash : {}",
+ flowDescriptor.getTableKey().getId(), flowDescriptor.getFlowId().getValue(), correctFlowRegistryKey.hashCode());
+
+ flowRegistry.put(correctFlowRegistryKey, flowDescriptor);
} catch (IllegalArgumentException ex) {
- LOG.warn("Flow with flowId {} already exists in table {}", flowDescriptor.getFlowId().getValue(),
- flowDescriptor.getTableKey().getId());
- final FlowId newFlowId = createAlienFlowId(flowDescriptor.getTableKey().getId());
- final FlowDescriptor newFlowDescriptor = FlowDescriptorFactory.
- create(flowDescriptor.getTableKey().getId(), newFlowId);
- flowRegistry.put(flowRegistryKey, newFlowDescriptor);
+ if (hasMark(flowRegistry.inverse().get(flowDescriptor))) {
+ // We are probably doing update of flow, but without changing flow ID or table ID, so we need to replace
+ // old value with new value, but keep the old value marked for removal
+ flowRegistry.forcePut(correctFlowRegistryKey, flowDescriptor);
+ return;
+ }
+
+ // We are trying to store new flow to flow registry, but we already have different flow with same flow ID
+ // stored in registry, so we need to create alien ID for this new flow here.
+ LOG.warn("Flow with flow ID {} already exists in table {}, generating alien flow ID", flowDescriptor.getFlowId().getValue(),
+ flowDescriptor.getTableKey().getId());
+
+ flowRegistry.put(
+ correctFlowRegistryKey,
+ FlowDescriptorFactory.create(
+ flowDescriptor.getTableKey().getId(),
+ createAlienFlowId(flowDescriptor.getTableKey().getId())));
}
}
@Override
- public void update(final FlowRegistryKey newFlowRegistryKey, final FlowDescriptor flowDescriptor) {
- LOG.trace("Updating the entry with hash: {}", newFlowRegistryKey.hashCode());
- flowRegistry.forcePut(newFlowRegistryKey, flowDescriptor);
+ @GuardedBy("this")
+ public synchronized void forEachEntry(final BiConsumer<FlowRegistryKey, FlowDescriptor> consumer) {
+ flowRegistry.forEach(consumer);
}
@Override
- public FlowId storeIfNecessary(final FlowRegistryKey flowRegistryKey) {
- LOG.trace("Trying to retrieve flow ID for flow hash : {}", flowRegistryKey.hashCode());
-
- // First, try to get FlowDescriptor from flow registry
- FlowDescriptor flowDescriptor = retrieveIdForFlow(flowRegistryKey);
-
- // We was not able to retrieve FlowDescriptor, so we will at least try to generate it
- if (flowDescriptor == null) {
- LOG.trace("Flow descriptor for flow hash : {} not found, generating alien flow ID", flowRegistryKey.hashCode());
+ @GuardedBy("this")
+ public synchronized void store(final FlowRegistryKey flowRegistryKey) {
+ if (Objects.isNull(retrieveDescriptor(flowRegistryKey))) {
+ // We do not found flow in flow registry, that means it do not have any ID already assigned, so we need
+ // to generate new alien flow ID here.
+ LOG.debug("Flow descriptor for flow hash : {} not found, generating alien flow ID", flowRegistryKey.hashCode());
final short tableId = flowRegistryKey.getTableId();
final FlowId alienFlowId = createAlienFlowId(tableId);
- flowDescriptor = FlowDescriptorFactory.create(tableId, alienFlowId);
+ final FlowDescriptor flowDescriptor = FlowDescriptorFactory.create(tableId, alienFlowId);
// Finally we got flowDescriptor, so now we will store it to registry,
// so next time we won't need to generate it again
- store(flowRegistryKey, flowDescriptor);
+ storeDescriptor(flowRegistryKey, flowDescriptor);
}
-
- return flowDescriptor.getFlowId();
}
@Override
- public void removeDescriptor(final FlowRegistryKey flowRegistryKey) {
+ @GuardedBy("this")
+ public synchronized void addMark(final FlowRegistryKey flowRegistryKey) {
LOG.trace("Removing flow descriptor for flow hash : {}", flowRegistryKey.hashCode());
- flowRegistry.remove(flowRegistryKey);
+ marks.add(flowRegistryKey);
}
@Override
- public Map<FlowRegistryKey, FlowDescriptor> getAllFlowDescriptors() {
- return Collections.unmodifiableMap(flowRegistry);
+ @GuardedBy("this")
+ public synchronized boolean hasMark(final FlowRegistryKey flowRegistryKey) {
+ return Objects.nonNull(flowRegistryKey) && marks.contains(correctFlowRegistryKey(marks, flowRegistryKey));
+
}
@Override
- public void close() {
+ @GuardedBy("this")
+ public synchronized void processMarks() {
+ // Remove all flows that was marked for removal from flow registry and clear all marks.
+ marks.forEach(flowRegistry::remove);
+ marks.clear();
+ }
+
+ @Override
+ @GuardedBy("this")
+ public synchronized void forEach(final Consumer<FlowRegistryKey> consumer) {
+ flowRegistry.keySet().forEach(consumer);
+ }
+
+ @Override
+ @GuardedBy("this")
+ public synchronized int size() {
+ return flowRegistry.size();
+ }
+
+ @Override
+ @GuardedBy("this")
+ public synchronized void close() {
final Iterator<ListenableFuture<List<Optional<FlowCapableNode>>>> iterator = lastFillFutures.iterator();
- while(iterator.hasNext()) {
+ // We need to force interrupt and clear all running futures that are trying to read flow IDs from datastore
+ while (iterator.hasNext()) {
final ListenableFuture<List<Optional<FlowCapableNode>>> next = iterator.next();
boolean success = next.cancel(true);
LOG.trace("Cancelling filling flow registry with flows job {} with result: {}", next, success);
}
flowRegistry.clear();
+ marks.clear();
+ }
+
+ @GuardedBy("this")
+ private FlowRegistryKey correctFlowRegistryKey(final Collection<FlowRegistryKey> flowRegistryKeys, final FlowRegistryKey key) {
+ if (Objects.isNull(key)) {
+ return null;
+ }
+
+ if (!flowRegistryKeys.contains(key)) {
+ // If we failed to compare FlowRegistryKey by hashCode, try to retrieve correct FlowRegistryKey
+ // from set of keys using custom comparator method for Match. This case can occur when we have different
+ // augmentations on extensions, or switch returned things like IP address or port in different format that
+ // we sent.
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Failed to retrieve flow descriptor for flow hash : {}, trying with custom equals method", key.hashCode());
+ }
+
+ for (final FlowRegistryKey flowRegistryKey : flowRegistryKeys) {
+ if (key.equals(flowRegistryKey)) {
+ return flowRegistryKey;
+ }
+ }
+ }
+
+ // If we failed to find key at all or key is already present in set of keys, just return original key
+ return key;
}
@VisibleForTesting
static FlowId createAlienFlowId(final short tableId) {
final String alienId = ALIEN_SYSTEM_FLOW_ID + tableId + '-' + UNACCOUNTED_FLOWS_COUNTER.incrementAndGet();
+ LOG.debug("Created alien flow id {} for table id {}", alienId, tableId);
return new FlowId(alienId);
}
+
+ @VisibleForTesting
+ Map<FlowRegistryKey, FlowDescriptor> getAllFlowDescriptors() {
+ return flowRegistry;
+ }
}
package org.opendaylight.openflowplugin.impl.registry.group;
+import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.function.Consumer;
import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
-/**
- * Created by Martin Bobak <mbobak@cisco.com> on 15.4.2015.
- */
public class DeviceGroupRegistryImpl implements DeviceGroupRegistry {
- private final List<GroupId> groupIdList = new ArrayList<>();
- private final List<GroupId> marks = new ArrayList<>();
+ private final List<GroupId> groupIds = Collections.synchronizedList(new ArrayList<>());
+ private final List<GroupId> marks = Collections.synchronizedList(new ArrayList<>());
@Override
public void store(final GroupId groupId) {
- groupIdList.add(groupId);
+ groupIds.add(groupId);
}
@Override
- public void markToBeremoved(final GroupId groupId) {
+ public void addMark(final GroupId groupId) {
marks.add(groupId);
}
@Override
- public void removeMarked() {
- synchronized (groupIdList) {
- groupIdList.removeAll(marks);
- }
+ public boolean hasMark(final GroupId groupId) {
+ return marks.contains(groupId);
+ }
+
+ @Override
+ public void processMarks() {
+ groupIds.removeAll(marks);
marks.clear();
}
@Override
- public List<GroupId> getAllGroupIds() {
- return groupIdList;
+ public void forEach(final Consumer<GroupId> consumer) {
+ synchronized (groupIds) {
+ groupIds.forEach(consumer);
+ }
+ }
+
+ @Override
+ public int size() {
+ return groupIds.size();
}
@Override
public void close() {
- synchronized (groupIdList) {
- groupIdList.clear();
- }
- synchronized (marks) {
- marks.clear();
- }
+ groupIds.clear();
+ marks.clear();
+ }
+
+ @VisibleForTesting
+ List<GroupId> getAllGroupIds() {
+ return groupIds;
}
}
package org.opendaylight.openflowplugin.impl.registry.meter;
+import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.function.Consumer;
import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
-/**
- * Created by Martin Bobak <mbobak@cisco.com> on 15.4.2015.
- */
public class DeviceMeterRegistryImpl implements DeviceMeterRegistry {
- private final List<MeterId> meterIds = new ArrayList<>();
- private final List<MeterId> marks = new ArrayList<>();
+ private final List<MeterId> meterIds = Collections.synchronizedList(new ArrayList<>());
+ private final List<MeterId> marks = Collections.synchronizedList(new ArrayList<>());
@Override
public void store(final MeterId meterId) {
}
@Override
- public void markToBeremoved(final MeterId meterId) {
+ public void addMark(final MeterId meterId) {
marks.add(meterId);
}
@Override
- public void removeMarked() {
+ public boolean hasMark(final MeterId meterId) {
+ return marks.contains(meterId);
+ }
+
+ @Override
+ public void processMarks() {
+ meterIds.removeAll(marks);
+ marks.clear();
+ }
+
+ @Override
+ public void forEach(final Consumer<MeterId> consumer) {
synchronized (meterIds) {
- meterIds.removeAll(marks);
- }
- synchronized (marks) {
- marks.clear();
+ meterIds.forEach(consumer);
}
}
@Override
- public List<MeterId> getAllMeterIds() {
- return meterIds;
+ public int size() {
+ return meterIds.size();
}
@Override
public void close() {
- synchronized (meterIds) {
- meterIds.clear();
- }
- synchronized (marks) {
- marks.clear();
- }
+ meterIds.clear();
+ marks.clear();
+ }
+
+ @VisibleForTesting
+ List<MeterId> getAllMeterIds() {
+ return meterIds;
}
}
StatisticsGatheringUtils.writeFlowStatistics(allMultipartData, deviceInfo, registry, txFacade);
if (!multipartReply.getFlags().isOFPMPFREQMORE()) {
endCollecting();
+ registry.processMarks();
}
}
}
if (Objects.nonNull(input.getFlowRef())) {
final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class, FlowKey.class).getId();
flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
- deviceContext.getDeviceFlowRegistry().store(flowRegistryKey, flowDescriptor);
+ deviceContext.getDeviceFlowRegistry().storeDescriptor(flowRegistryKey, flowDescriptor);
} else {
- final FlowId flowId = deviceContext.getDeviceFlowRegistry().storeIfNecessary(flowRegistryKey);
- flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
+ deviceContext.getDeviceFlowRegistry().store(flowRegistryKey);
+ flowDescriptor = deviceContext.getDeviceFlowRegistry().retrieveDescriptor(flowRegistryKey);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Flow remove finished without error for flow={}", input);
}
FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
- deviceContext.getDeviceFlowRegistry().removeDescriptor(flowRegistryKey);
+ deviceContext.getDeviceFlowRegistry().addMark(flowRegistryKey);
if (itemLifecycleListener != null) {
- final FlowDescriptor flowDescriptor =
- deviceContext.getDeviceFlowRegistry().retrieveIdForFlow(flowRegistryKey);
+ final FlowDescriptor flowDescriptor = deviceContext.getDeviceFlowRegistry().retrieveDescriptor(flowRegistryKey);
+
if (flowDescriptor != null) {
KeyedInstanceIdentifier<Flow, FlowKey> flowPath = createFlowPath(flowDescriptor,
deviceContext.getDeviceInfo().getNodeInstanceIdentifier());
final OriginalFlow original = input.getOriginalFlow();
final FlowRegistryKey origFlowRegistryKey = FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), original);
final FlowRegistryKey updatedFlowRegistryKey = FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), updated);
- final FlowDescriptor origFlowDescriptor = deviceFlowRegistry.retrieveIdForFlow(origFlowRegistryKey);
+ final FlowDescriptor origFlowDescriptor = deviceFlowRegistry.retrieveDescriptor(origFlowRegistryKey);
final boolean isUpdate = Objects.nonNull(origFlowDescriptor);
- final FlowId fLowId = Objects.nonNull(input.getFlowRef())
- ? input.getFlowRef().getValue().firstKeyOf(Flow.class).getId()
- : isUpdate ? origFlowDescriptor.getFlowId() : deviceFlowRegistry.storeIfNecessary(updatedFlowRegistryKey);
- final FlowDescriptor updatedFlowDescriptor = FlowDescriptorFactory.create(updated.getTableId(), fLowId);
+ final FlowDescriptor updatedFlowDescriptor;
+
+ if (Objects.nonNull(input.getFlowRef())) {
+ updatedFlowDescriptor = FlowDescriptorFactory.create(updated.getTableId(), input.getFlowRef().getValue().firstKeyOf(Flow.class).getId());
+ } else {
+ if (isUpdate) {
+ updatedFlowDescriptor = origFlowDescriptor;
+ } else {
+ deviceFlowRegistry.store(updatedFlowRegistryKey);
+ updatedFlowDescriptor = deviceFlowRegistry.retrieveDescriptor(updatedFlowRegistryKey);
+ }
+ }
+
if (isUpdate) {
- deviceFlowRegistry.removeDescriptor(origFlowRegistryKey);
- deviceFlowRegistry.store(updatedFlowRegistryKey, updatedFlowDescriptor);
+ deviceFlowRegistry.addMark(origFlowRegistryKey);
+ deviceFlowRegistry.storeDescriptor(updatedFlowRegistryKey, updatedFlowDescriptor);
}
if (itemLifecycleListener != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Group remove with id={} finished without error", input.getGroupId().getValue());
}
- removeGroup.getDeviceRegistry().getDeviceGroupRegistry().markToBeremoved(input.getGroupId());
+ removeGroup.getDeviceRegistry().getDeviceGroupRegistry().addMark(input.getGroupId());
removeIfNecessaryFromDS(input.getGroupId());
} else {
if (LOG.isDebugEnabled()) {
@Override
public Future<RpcResult<RemoveMeterOutput>> removeMeter(final RemoveMeterInput input) {
- removeMeter.getDeviceRegistry().getDeviceMeterRegistry().markToBeremoved(input.getMeterId());
final ListenableFuture<RpcResult<RemoveMeterOutput>> resultFuture = removeMeter.handleServiceCall(input);
Futures.addCallback(resultFuture, new FutureCallback<RpcResult<RemoveMeterOutput>>() {
@Override
if (LOG.isDebugEnabled()) {
LOG.debug("Meter remove with id={} finished without error", input.getMeterId());
}
+ removeMeter.getDeviceRegistry().getDeviceMeterRegistry().addMark(input.getMeterId());
removeIfNecessaryFromDS(input.getMeterId());
} else {
if (LOG.isDebugEnabled()) {
private static KeyedInstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter, MeterKey> createMeterPath(final MeterId meterId, final KeyedInstanceIdentifier<Node, NodeKey> nodePath) {
return nodePath.augmentation(FlowCapableNode.class).child(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter.class, new MeterKey(meterId));
}
-}
\ No newline at end of file
+}
writeFlowStatistics(data, deviceInfo, flowRegistry, txFacade);
txFacade.submitTransaction();
EventsTimeCounter.markEnd(eventIdentifier);
+ flowRegistry.processMarks();
return Boolean.TRUE;
});
}
final short tableId = flowStat.getTableId();
final FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(deviceInfo.getVersion(), flowBuilder.build());
- final FlowId flowId = registry.storeIfNecessary(flowRegistryKey);
+ registry.store(flowRegistryKey);
+ final FlowId flowId = registry.retrieveDescriptor(flowRegistryKey).getFlowId();
final FlowKey flowKey = new FlowKey(flowId);
flowBuilder.setKey(flowKey);
final DeviceMeterRegistry meterRegistry,
final InstanceIdentifier<FlowCapableNode> flowNodeIdent,
final TxFacade txFacade) throws TransactionChainClosedException {
- for (final MeterId meterId : meterRegistry.getAllMeterIds()) {
+ meterRegistry.forEach(meterId -> {
final InstanceIdentifier<Meter> meterIdent = flowNodeIdent.child(Meter.class, new MeterKey(meterId));
txFacade.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, meterIdent);
- }
- meterRegistry.removeMarked();
+ });
+ meterRegistry.processMarks();
}
private static void processGroupDescStats(
final TxFacade txFacade,
final InstanceIdentifier<FlowCapableNode> flowNodeIdent,
final DeviceGroupRegistry groupRegistry) throws TransactionChainClosedException {
- for (final GroupId groupId : groupRegistry.getAllGroupIds()) {
+ groupRegistry.forEach(groupId -> {
final InstanceIdentifier<Group> groupIdent = flowNodeIdent.child(Group.class, new GroupKey(groupId));
txFacade.addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, groupIdent);
- }
- groupRegistry.removeMarked();
+ });
+ groupRegistry.processMarks();
}
private static void processGroupStatistics(
import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.data.FlowStatsResponseConvertorData;
-import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.data.VersionDatapathIdConvertorData;
import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.match.MatchReactor;
import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsOutput;
.addAugmentation(FlowStatisticsData.class, flowStatisticsDataBld.build());
final FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(getVersion(), flowBuilder.build());
- return getDeviceRegistry().getDeviceFlowRegistry().storeIfNecessary(flowRegistryKey);
+ getDeviceRegistry().getDeviceFlowRegistry().store(flowRegistryKey);
+ return getDeviceRegistry().getDeviceFlowRegistry().retrieveDescriptor(flowRegistryKey).getFlowId();
}
}
// insert flow+flowId into local registry
final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(deviceInfo.getVersion(), flowRemovedMdsalBld.build());
final FlowDescriptor flowDescriptor = FlowDescriptorFactory.create((short) 0, new FlowId("ut-ofp:f456"));
- deviceContext.getDeviceFlowRegistry().store(flowRegKey, flowDescriptor);
+ deviceContext.getDeviceFlowRegistry().storeDescriptor(flowRegKey, flowDescriptor);
// plug in lifecycleListener
final ItemLifecycleListener itemLifecycleListener = Mockito.mock(ItemLifecycleListener.class);
public void testOnDeviceDisconnected() throws Exception {
final DeviceTerminationPhaseHandler deviceContextClosedHandler = mock(DeviceTerminationPhaseHandler.class);
- assertEquals(0, deviceContext.getDeviceFlowRegistry().getAllFlowDescriptors().size());
- assertEquals(0, deviceContext.getDeviceGroupRegistry().getAllGroupIds().size());
- assertEquals(0, deviceContext.getDeviceMeterRegistry().getAllMeterIds().size());
+ assertEquals(0, deviceContext.getDeviceFlowRegistry().size());
+ assertEquals(0, deviceContext.getDeviceGroupRegistry().size());
+ assertEquals(0, deviceContext.getDeviceMeterRegistry().size());
}
descriptor = FlowDescriptorFactory.create(key.getTableId(), new FlowId("ut:1"));
Assert.assertEquals(0, deviceFlowRegistry.getAllFlowDescriptors().size());
- deviceFlowRegistry.store(key, descriptor);
+ deviceFlowRegistry.storeDescriptor(key, descriptor);
Assert.assertEquals(1, deviceFlowRegistry.getAllFlowDescriptors().size());
}
order.verify(readOnlyTransaction).read(LogicalDatastoreType.OPERATIONAL, path);
assertTrue(allFlowDescriptors.containsKey(key));
- deviceFlowRegistry.removeDescriptor(key);
+ deviceFlowRegistry.addMark(key);
}
@Test
@Test
public void testRetrieveIdForFlow() throws Exception {
- Assert.assertEquals(descriptor, deviceFlowRegistry.retrieveIdForFlow(key));
+ Assert.assertEquals(descriptor, deviceFlowRegistry.retrieveDescriptor(key));
}
@Test
public void testStore() throws Exception {
//store the same key with different value
final FlowDescriptor descriptor2 = FlowDescriptorFactory.create(key.getTableId(), new FlowId("ut:2"));
- deviceFlowRegistry.store(key, descriptor2);
+ deviceFlowRegistry.storeDescriptor(key, descriptor2);
Assert.assertEquals(1, deviceFlowRegistry.getAllFlowDescriptors().size());
- Assert.assertEquals("ut:2", deviceFlowRegistry.retrieveIdForFlow(key).getFlowId().getValue());
+ Assert.assertEquals("ut:2", deviceFlowRegistry.retrieveDescriptor(key).getFlowId().getValue());
// store new key with old value
final FlowAndStatisticsMapList flowStats = TestFlowHelper.createFlowAndStatisticsMapListBuilder(2).build();
final FlowRegistryKey key2 = FlowRegistryKeyFactory.create(OFConstants.OFP_VERSION_1_3, flowStats);
- deviceFlowRegistry.store(key2, descriptor);
+ deviceFlowRegistry.storeDescriptor(key2, descriptor);
Assert.assertEquals(2, deviceFlowRegistry.getAllFlowDescriptors().size());
- Assert.assertEquals("ut:1", deviceFlowRegistry.retrieveIdForFlow(key2).getFlowId().getValue());
+ Assert.assertEquals("ut:1", deviceFlowRegistry.retrieveDescriptor(key2).getFlowId().getValue());
}
@Test
FlowId newFlowId;
//store existing key
- newFlowId = deviceFlowRegistry.storeIfNecessary(key);
+ deviceFlowRegistry.store(key);
+ newFlowId = deviceFlowRegistry.retrieveDescriptor(key).getFlowId();
Assert.assertEquals(1, deviceFlowRegistry.getAllFlowDescriptors().size());
- Assert.assertEquals(descriptor, deviceFlowRegistry.retrieveIdForFlow(key));
+ Assert.assertEquals(descriptor, deviceFlowRegistry.retrieveDescriptor(key));
Assert.assertEquals(descriptor.getFlowId(), newFlowId);
//store new key
final String alienPrefix = "#UF$TABLE*2-";
final FlowRegistryKey key2 = FlowRegistryKeyFactory.create(OFConstants.OFP_VERSION_1_3, TestFlowHelper.createFlowAndStatisticsMapListBuilder(2).build());
- newFlowId = deviceFlowRegistry.storeIfNecessary(key2);
+ deviceFlowRegistry.store(key2);
+ newFlowId = deviceFlowRegistry.retrieveDescriptor(key2).getFlowId();
Assert.assertTrue(newFlowId.getValue().startsWith(alienPrefix));
- Assert.assertTrue(deviceFlowRegistry.retrieveIdForFlow(key2).getFlowId().getValue().startsWith(alienPrefix));
+ Assert.assertTrue(deviceFlowRegistry.retrieveDescriptor(key2).getFlowId().getValue().startsWith(alienPrefix));
Assert.assertEquals(2, deviceFlowRegistry.getAllFlowDescriptors().size());
}
@Test
public void testRemoveDescriptor() throws Exception {
- deviceFlowRegistry.removeDescriptor(key);
- Assert.assertEquals(0, deviceFlowRegistry.getAllFlowDescriptors().size());
+ deviceFlowRegistry.addMark(key);
+ Assert.assertEquals(1, deviceFlowRegistry.getAllFlowDescriptors().size());
}
@Test
@Test
public void testRemoveMarked() throws Exception {
- deviceGroupRegistry.markToBeremoved(groupId);
- deviceGroupRegistry.removeMarked();
+ deviceGroupRegistry.addMark(groupId);
+ deviceGroupRegistry.processMarks();
Assert.assertEquals(0, deviceGroupRegistry.getAllGroupIds().size());
}
@Test
public void testRemoveMarkedNegative() throws Exception {
- deviceGroupRegistry.markToBeremoved(groupId2);
- deviceGroupRegistry.removeMarked();
+ deviceGroupRegistry.addMark(groupId2);
+ deviceGroupRegistry.processMarks();
Assert.assertEquals(1, deviceGroupRegistry.getAllGroupIds().size());
}
@Test
public void testClose() throws Exception {
- deviceGroupRegistry.markToBeremoved(groupId);
+ deviceGroupRegistry.addMark(groupId);
deviceGroupRegistry.close();
Assert.assertEquals(0, deviceGroupRegistry.getAllGroupIds().size());
deviceGroupRegistry.store(groupId);
Assert.assertEquals(1, deviceGroupRegistry.getAllGroupIds().size());
- deviceGroupRegistry.removeMarked();
+ deviceGroupRegistry.processMarks();
Assert.assertEquals(1, deviceGroupRegistry.getAllGroupIds().size());
}
-}
\ No newline at end of file
+}
@Test
public void testRemoveMarked() throws Exception {
- deviceMeterRegistry.markToBeremoved(meterId);
- deviceMeterRegistry.removeMarked();
+ deviceMeterRegistry.addMark(meterId);
+ deviceMeterRegistry.processMarks();
Assert.assertEquals(0, deviceMeterRegistry.getAllMeterIds().size());
}
@Test
public void testRemoveMarkedNegative() throws Exception {
- deviceMeterRegistry.markToBeremoved(meterId2);
- deviceMeterRegistry.removeMarked();
+ deviceMeterRegistry.addMark(meterId2);
+ deviceMeterRegistry.processMarks();
Assert.assertEquals(1, deviceMeterRegistry.getAllMeterIds().size());
}
@Test
public void testClose() throws Exception {
- deviceMeterRegistry.markToBeremoved(meterId);
+ deviceMeterRegistry.addMark(meterId);
deviceMeterRegistry.close();
Assert.assertEquals(0, deviceMeterRegistry.getAllMeterIds().size());
deviceMeterRegistry.store(meterId);
Assert.assertEquals(1, deviceMeterRegistry.getAllMeterIds().size());
- deviceMeterRegistry.removeMarked();
+ deviceMeterRegistry.processMarks();
Assert.assertEquals(1, deviceMeterRegistry.getAllMeterIds().size());
}
-}
\ No newline at end of file
+}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
when(mockedDeviceContext.getDeviceState()).thenReturn(mockedDeviceState);
when(mockedDeviceContext.getDeviceInfo()).thenReturn(mockedDeviceInfo);
when(mockedDeviceContext.getDeviceFlowRegistry()).thenReturn(mockedFlowRegistry);
- when(mockedFlowRegistry.retrieveIdForFlow(Matchers.any(FlowRegistryKey.class))).thenReturn(mockedFlowDescriptor);
+ when(mockedFlowRegistry.retrieveDescriptor(Matchers.any(FlowRegistryKey.class))).thenReturn(mockedFlowDescriptor);
final InstanceIdentifier<FlowCapableNode> nodePath = mockedDeviceInfo.getNodeInstanceIdentifier().augmentation(FlowCapableNode.class);
final FlowCapableNodeBuilder flowNodeBuilder = new FlowCapableNodeBuilder();
when(mockedFlowDescriptor.getFlowId()).thenReturn(flowId);
when(mockedFlowDescriptor.getTableKey()).thenReturn(new TableKey(DUMMY_TABLE_ID));
- when(deviceFlowRegistry.storeIfNecessary(Matchers.any(FlowRegistryKey.class))).thenReturn(flowId);
- when(deviceFlowRegistry.retrieveIdForFlow(Matchers.any(FlowRegistryKey.class))).thenReturn(mockedFlowDescriptor);
+ when(deviceFlowRegistry.retrieveDescriptor(Matchers.any(FlowRegistryKey.class))).thenReturn(mockedFlowDescriptor);
}
private <T extends DataObject> void verifyOutput(Future<RpcResult<T>> rpcResultFuture) throws ExecutionException, InterruptedException {
salGroupService.removeGroup(removeGroupInput);
verify(mockedRequestContextStack).createRequestContext();
- verify(mockedDeviceGroupRegistry).markToBeremoved(eq(dummyGroupId));
+ verify(mockedDeviceGroupRegistry).addMark(eq(dummyGroupId));
if (itemLifecycleListener != null) {
verify(itemLifecycleListener).onRemoved(Matchers.<KeyedInstanceIdentifier<Group, GroupKey>>any());
salMeterService.removeMeter(removeMeterInput);
verify(mockedRequestContextStack).createRequestContext();
- verify(mockedDeviceMeterRegistry).markToBeremoved(eq(dummyMeterId));
+ verify(mockedDeviceMeterRegistry).addMark(eq(dummyMeterId));
if (itemLifecycleListener != null) {
verify(itemLifecycleListener).onRemoved(Matchers.<KeyedInstanceIdentifier<Meter, MeterKey>>any());
when(deviceContext.getDeviceFlowRegistry()).thenReturn(deviceFlowRegistry);
when(deviceContext.getDeviceGroupRegistry()).thenReturn(deviceGroupRegistry);
when(deviceContext.getDeviceMeterRegistry()).thenReturn(deviceMeterRegistry);
- when(deviceFlowRegistry.retrieveIdForFlow(Matchers.any(FlowRegistryKey.class))).thenReturn(flowDescriptor);
+ when(deviceFlowRegistry.retrieveDescriptor(Matchers.any(FlowRegistryKey.class))).thenReturn(flowDescriptor);
when(deviceContext.getReadTransaction()).thenReturn(readTx);
when(txFacade.getReadTransaction()).thenReturn(readTx);
when(deviceContext.getPrimaryConnectionContext()).thenReturn(connectionAdapter);
verify(deviceContext, Mockito.never()).addDeleteToTxChain(
Matchers.eq(LogicalDatastoreType.OPERATIONAL), Matchers.<InstanceIdentifier<?>>any());
- verify(deviceGroupRegistry).removeMarked();
+ verify(deviceGroupRegistry).processMarks();
verify(deviceGroupRegistry).store(storedGroupId);
verify(txFacade).writeToTransaction(
Matchers.eq(LogicalDatastoreType.OPERATIONAL), Matchers.eq(groupPath), Matchers.any(Group.class));
import java.util.List;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
+import org.opendaylight.openflowplugin.impl.registry.flow.FlowDescriptorFactory;
+import org.opendaylight.openflowplugin.impl.statistics.services.direct.AbstractDirectStatisticsServiceTest;
import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.direct.statistics.rev160511.GetFlowStatisticsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
public void setUp() throws Exception {
service = new FlowDirectStatisticsService(requestContextStack, deviceContext, convertorManager);
final DeviceFlowRegistry registry = mock(DeviceFlowRegistry.class);
- when(registry.storeIfNecessary(any())).thenReturn(new FlowId("1"));
+ when(registry.retrieveDescriptor(any())).thenReturn(FlowDescriptorFactory.create(TABLE_NO, new FlowId("1")));
when(deviceContext.getDeviceFlowRegistry()).thenReturn(registry);
}
service.storeStatistics(output);
verify(deviceContext).writeToTransactionWithParentsSlow(eq(LogicalDatastoreType.OPERATIONAL), any(), any());
}
-}
\ No newline at end of file
+}