X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fregistry%2Fflow%2FDeviceFlowRegistryImpl.java;h=0fee0d39aadaeda0f5280055282574e5ee43514b;hb=05f8db12159673d0e0a95642fe86e62c14b7dc7b;hp=bb53d3b5708096f198087cc34e66a07933dbd924;hpb=6916d4f22823d56b391d0d45cb8544260def3431;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/registry/flow/DeviceFlowRegistryImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/registry/flow/DeviceFlowRegistryImpl.java index bb53d3b570..0fee0d39aa 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/registry/flow/DeviceFlowRegistryImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/registry/flow/DeviceFlowRegistryImpl.java @@ -8,28 +8,27 @@ package org.opendaylight.openflowplugin.impl.registry.flow; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Maps; -import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import javax.annotation.Nonnull; -import javax.annotation.concurrent.ThreadSafe; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.ReadTransaction; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry; import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor; import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey; @@ -38,12 +37,16 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.Fl import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.general.rev140714.GeneralAugMatchNodesNodeTableFlow; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; +import org.opendaylight.yangtools.yang.common.Uint8; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@ThreadSafe +/* + * this class is marked to be thread safe + */ public class DeviceFlowRegistryImpl implements DeviceFlowRegistry { private static final Logger LOG = LoggerFactory.getLogger(DeviceFlowRegistryImpl.class); private static final String ALIEN_SYSTEM_FLOW_ID = "#UF$TABLE*"; @@ -65,7 +68,7 @@ public class DeviceFlowRegistryImpl implements DeviceFlowRegistry { flowConsumer = flow -> { final FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(version, flow); - if (!flowRegistry.containsKey(flowRegistryKey)) { + if (getExistingKey(flowRegistryKey) == null) { // Now, we will update the registry storeDescriptor(flowRegistryKey, FlowDescriptorFactory.create(flow.getTableId(), flow.getId())); } @@ -83,13 +86,13 @@ public class DeviceFlowRegistryImpl implements DeviceFlowRegistry { final InstanceIdentifier path = instanceIdentifier.augmentation(FlowCapableNode.class); // First, try to fill registry with flows from DS/Configuration - final CheckedFuture, ReadFailedException> configFuture = + final FluentFuture> configFuture = fillFromDatastore(LogicalDatastoreType.CONFIGURATION, path); // Now, try to fill registry with flows from DS/Operational // in case of cluster fail over, when clients are not using DS/Configuration // for adding flows, but only RPCs - final CheckedFuture, ReadFailedException> operationalFuture = + final FluentFuture> operationalFuture = fillFromDatastore(LogicalDatastoreType.OPERATIONAL, path); // And at last, chain and return futures created above. @@ -101,77 +104,61 @@ public class DeviceFlowRegistryImpl implements DeviceFlowRegistry { return lastFillFuture; } - private CheckedFuture, ReadFailedException> - fillFromDatastore(final LogicalDatastoreType logicalDatastoreType, + private FluentFuture> fillFromDatastore(final LogicalDatastoreType logicalDatastoreType, final InstanceIdentifier path) { - // Create new read-only transaction - final ReadOnlyTransaction transaction = dataBroker.newReadOnlyTransaction(); - - // Bail out early if transaction is null - if (transaction == null) { - return Futures.immediateFailedCheckedFuture( - new ReadFailedException("Read transaction is null")); - } - // Prepare read operation from datastore for path - final CheckedFuture, ReadFailedException> future = - transaction.read(logicalDatastoreType, path); - - // Bail out early if future is null - if (future == null) { - return Futures.immediateFailedCheckedFuture( - new ReadFailedException("Future from read transaction is null")); + final FluentFuture> future; + try (ReadTransaction transaction = dataBroker.newReadOnlyTransaction()) { + future = transaction.read(logicalDatastoreType, path); } - Futures.addCallback(future, new FutureCallback>() { + future.addCallback(new FutureCallback>() { @Override - public void onSuccess(Optional result) { - result.asSet().stream() - .filter(Objects::nonNull) - .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable())) - .flatMap(flowCapableNode -> flowCapableNode.getTable().stream()) - .filter(Objects::nonNull) - .filter(table -> Objects.nonNull(table.getFlow())) - .flatMap(table -> table.getFlow().stream()) - .filter(Objects::nonNull) - .filter(flow -> Objects.nonNull(flow.getId())) - .forEach(flowConsumer); - - // After we are done with reading from datastore, close the transaction - transaction.close(); + public void onSuccess(final Optional result) { + result.ifPresent(flowCapableNode -> { + flowCapableNode.nonnullTable().values().stream() + .filter(Objects::nonNull) + .flatMap(table -> table.nonnullFlow().values().stream()) + .filter(Objects::nonNull) + .filter(flow -> flow.getId() != null) + .forEach(flowConsumer); + }); } @Override - public void onFailure(Throwable throwable) { - // Even when read operation failed, close the transaction - transaction.close(); + public void onFailure(final Throwable throwable) { + LOG.debug("Failed to read {} path {}", logicalDatastoreType, path, throwable); } - }); + }, MoreExecutors.directExecutor()); return future; } @Override - public FlowDescriptor retrieveDescriptor(@Nonnull final FlowRegistryKey flowRegistryKey) { + public FlowDescriptor retrieveDescriptor(@NonNull final FlowRegistryKey flowRegistryKey) { if (LOG.isTraceEnabled()) { - LOG.trace("Retrieving flow descriptor for flow hash : {}", flowRegistryKey.hashCode()); + LOG.trace("Retrieving flow descriptor for flow registry : {}", flowRegistryKey.toString()); } - return flowRegistry.get(flowRegistryKey); + FlowRegistryKey existingFlowRegistryKey = getExistingKey(flowRegistryKey); + if (existingFlowRegistryKey != null) { + return flowRegistry.get(existingFlowRegistryKey); + } + return null; } @Override - public void storeDescriptor(@Nonnull final FlowRegistryKey flowRegistryKey, - @Nonnull final FlowDescriptor flowDescriptor) { + public void storeDescriptor(@NonNull final FlowRegistryKey flowRegistryKey, + @NonNull final FlowDescriptor flowDescriptor) { try { if (LOG.isTraceEnabled()) { LOG.trace("Storing flowDescriptor with table ID : {} and flow ID : {} for flow hash : {}", flowDescriptor.getTableKey().getId(), flowDescriptor.getFlowId().getValue(), - flowRegistryKey.hashCode()); + flowRegistryKey.toString()); } - flowRegistry.put(flowRegistryKey, flowDescriptor); + addToFlowRegistry(flowRegistryKey, flowDescriptor); } catch (IllegalArgumentException ex) { if (LOG.isWarnEnabled()) { LOG.warn("Flow with flow ID {} already exists in table {}, generating alien flow ID", @@ -181,7 +168,7 @@ public class DeviceFlowRegistryImpl implements DeviceFlowRegistry { // We are trying to store new flow to flow registry, but we already have different flow with same flow ID // stored in registry, so we need to create alien ID for this new flow here. - flowRegistry.put( + addToFlowRegistry( flowRegistryKey, FlowDescriptorFactory.create( flowDescriptor.getTableKey().getId(), @@ -191,29 +178,23 @@ public class DeviceFlowRegistryImpl implements DeviceFlowRegistry { @Override public void store(final FlowRegistryKey flowRegistryKey) { - if (Objects.isNull(retrieveDescriptor(flowRegistryKey))) { - if (LOG.isDebugEnabled()) { - LOG.debug("Flow descriptor for flow hash : {} not found, generating alien flow ID", - flowRegistryKey.hashCode()); - } + if (retrieveDescriptor(flowRegistryKey) == null) { + LOG.debug("Flow descriptor for flow hash : {} not found, generating alien flow ID", flowRegistryKey); // We do not found flow in flow registry, that means it do not have any ID already assigned, so we need // to generate new alien flow ID here. - storeDescriptor( - flowRegistryKey, - FlowDescriptorFactory.create( - flowRegistryKey.getTableId(), - createAlienFlowId(flowRegistryKey.getTableId()))); + final Uint8 tableId = Uint8.valueOf(flowRegistryKey.getTableId()); + storeDescriptor(flowRegistryKey, FlowDescriptorFactory.create(tableId, createAlienFlowId(tableId))); } } @Override public void addMark(final FlowRegistryKey flowRegistryKey) { if (LOG.isTraceEnabled()) { - LOG.trace("Removing flow descriptor for flow hash : {}", flowRegistryKey.hashCode()); + LOG.trace("Removing flow descriptor for flow hash : {}", flowRegistryKey.toString()); } - flowRegistry.remove(flowRegistryKey); + removeFromFlowRegistry(flowRegistryKey); } @Override @@ -249,12 +230,52 @@ public class DeviceFlowRegistryImpl implements DeviceFlowRegistry { } @VisibleForTesting - static FlowId createAlienFlowId(final short tableId) { + static FlowId createAlienFlowId(final Uint8 tableId) { final String alienId = ALIEN_SYSTEM_FLOW_ID + tableId + '-' + UNACCOUNTED_FLOWS_COUNTER.incrementAndGet(); LOG.debug("Created alien flow id {} for table id {}", alienId, tableId); return new FlowId(alienId); } + //Hashcode generation of the extension augmentation can differ for the same object received from the datastore and + // the one received after deserialization of switch message. OpenFlowplugin extensions are list, and the order in + // which it can receive the extensions back from switch can differ and that lead to a different hashcode. In that + // scenario, hashcode won't match and flowRegistry return the related key. To overcome this issue, these methods + // make sure that key is stored only if it doesn't equals to any existing key. + private void addToFlowRegistry(final FlowRegistryKey flowRegistryKey, final FlowDescriptor flowDescriptor) { + FlowRegistryKey existingFlowRegistryKey = getExistingKey(flowRegistryKey); + if (existingFlowRegistryKey == null) { + flowRegistry.put(flowRegistryKey, flowDescriptor); + } else { + flowRegistry.put(existingFlowRegistryKey, flowDescriptor); + } + } + + private void removeFromFlowRegistry(final FlowRegistryKey flowRegistryKey) { + FlowRegistryKey existingFlowRegistryKey = getExistingKey(flowRegistryKey); + if (existingFlowRegistryKey != null) { + flowRegistry.remove(existingFlowRegistryKey); + } else { + flowRegistry.remove(flowRegistryKey); + } + } + + private FlowRegistryKey getExistingKey(final FlowRegistryKey flowRegistryKey) { + if (flowRegistryKey.getMatch().augmentation(GeneralAugMatchNodesNodeTableFlow.class) == null) { + if (flowRegistry.containsKey(flowRegistryKey)) { + return flowRegistryKey; + } + } else { + synchronized (flowRegistry) { + for (Map.Entry keyValueSet : flowRegistry.entrySet()) { + if (keyValueSet.getKey().equals(flowRegistryKey)) { + return keyValueSet.getKey(); + } + } + } + } + return null; + } + @VisibleForTesting Map getAllFlowDescriptors() { return flowRegistry;