X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fregistry%2Fflow%2FDeviceFlowRegistryImpl.java;h=01a92d00f439d6a8ea68858eadff433cc5d38f16;hb=0ac8877c3539d82e55ffe49dc0a2c8bcb0c82b19;hp=ace0da644f073beb4b9842da7d266e013dd769eb;hpb=741bbb57bb064956cd7ed65af1cdea1d4361abf1;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/registry/flow/DeviceFlowRegistryImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/registry/flow/DeviceFlowRegistryImpl.java index ace0da644f..01a92d00f4 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/registry/flow/DeviceFlowRegistryImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/registry/flow/DeviceFlowRegistryImpl.java @@ -5,77 +5,235 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.openflowplugin.impl.registry.flow; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry; import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor; -import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowHash; -import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryException; +import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -/** - * Created by Martin Bobak <mbobak@cisco.com> on 8.4.2015. - */ public class DeviceFlowRegistryImpl implements DeviceFlowRegistry { - - private final Map flowRegistry = new HashMap<>(); - private final List marks = new ArrayList(); private static final Logger LOG = LoggerFactory.getLogger(DeviceFlowRegistryImpl.class); + private static final String ALIEN_SYSTEM_FLOW_ID = "#UF$TABLE*"; + private static final AtomicInteger UNACCOUNTED_FLOWS_COUNTER = new AtomicInteger(0); + + private final BiMap flowRegistry = Maps.synchronizedBiMap(HashBiMap.create()); + private final DataBroker dataBroker; + private final KeyedInstanceIdentifier instanceIdentifier; + private final List>>> lastFillFutures = new ArrayList<>(); + private final Consumer flowConsumer; + + public DeviceFlowRegistryImpl(final short version, final DataBroker dataBroker, final KeyedInstanceIdentifier instanceIdentifier) { + this.dataBroker = dataBroker; + this.instanceIdentifier = instanceIdentifier; + + // Specifies what to do with flow read from datastore + flowConsumer = flow -> { + // Create flow registry key from flow + final FlowRegistryKey key = FlowRegistryKeyFactory.create(version, flow); + + // Now, we will update the registry, but we will also try to prevent duplicate entries + if (!flowRegistry.containsKey(key)) { + LOG.trace("Found flow with table ID : {} and flow ID : {}", flow.getTableId(), flow.getId().getValue()); + final FlowDescriptor descriptor = FlowDescriptorFactory.create(flow.getTableId(), flow.getId()); + store(key, descriptor); + } + }; + } @Override - public FlowDescriptor retrieveIdForFlow(final FlowHash flowHash) { - FlowDescriptor flowDescriptor = flowRegistry.get(flowHash); - return flowDescriptor; + public ListenableFuture>> fill() { + LOG.debug("Filling flow registry with flows for node: {}", instanceIdentifier.getKey().getId().getValue()); + + // Prepare path for read transaction + // TODO: Read only Tables, and not entire FlowCapableNode (fix Yang model) + final InstanceIdentifier path = instanceIdentifier.augmentation(FlowCapableNode.class); + + // First, try to fill registry with flows from DS/Configuration + CheckedFuture, ReadFailedException> configFuture = fillFromDatastore(LogicalDatastoreType.CONFIGURATION, path); + + // Now, try to fill registry with flows from DS/Operational + // in case of cluster fail over, when clients are not using DS/Configuration + // for adding flows, but only RPCs + CheckedFuture, ReadFailedException> operationalFuture = fillFromDatastore(LogicalDatastoreType.OPERATIONAL, path); + + // And at last, chain and return futures created above. + // Also, cache this future, so call to DeviceFlowRegistry.close() will be able + // to cancel this future immediately if it will be still in progress + final ListenableFuture>> lastFillFuture = Futures.allAsList(Arrays.asList(configFuture, operationalFuture)); + lastFillFutures.add(lastFillFuture); + return lastFillFuture; } + private CheckedFuture, ReadFailedException> fillFromDatastore(final LogicalDatastoreType logicalDatastoreType, final InstanceIdentifier path) { + // Create new read-only transaction + final ReadOnlyTransaction transaction = dataBroker.newReadOnlyTransaction(); + + // Bail out early if transaction is null + if (transaction == null) { + return Futures.immediateFailedCheckedFuture( + new ReadFailedException("Read transaction is null")); + } + + // Prepare read operation from datastore for path + final CheckedFuture, ReadFailedException> future = + transaction.read(logicalDatastoreType, path); + + // Bail out early if future is null + if (future == null) { + return Futures.immediateFailedCheckedFuture( + new ReadFailedException("Future from read transaction is null")); + } + + Futures.addCallback(future, new FutureCallback>() { + @Override + public void onSuccess(Optional result) { + result.asSet().stream() + .filter(Objects::nonNull) + .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable())) + .flatMap(flowCapableNode -> flowCapableNode.getTable().stream()) + .filter(Objects::nonNull) + .filter(table -> Objects.nonNull(table.getFlow())) + .flatMap(table -> table.getFlow().stream()) + .filter(Objects::nonNull) + .filter(flow -> Objects.nonNull(flow.getId())) + .forEach(flowConsumer); + + // After we are done with reading from datastore, close the transaction + transaction.close(); + } + + @Override + public void onFailure(Throwable t) { + // Even when read operation failed, close the transaction + transaction.close(); + } + }); + + return future; + } @Override - public void store(final FlowHash flowHash, final FlowDescriptor flowDescriptor) { - LOG.trace("Storing flowDescriptor with table ID : {} and flow ID : {} for flow hash : {}", flowDescriptor.getTableKey().getId(), flowDescriptor.getFlowId().getValue(), flowHash.hashCode()); - synchronized (flowRegistry) { - flowRegistry.put(flowHash, flowDescriptor); + public FlowDescriptor retrieveIdForFlow(final FlowRegistryKey flowRegistryKey) { + LOG.trace("Retrieving flow descriptor for flow hash : {}", flowRegistryKey.hashCode()); + FlowDescriptor flowDescriptor = flowRegistry.get(flowRegistryKey); + // Get FlowDescriptor from flow registry + if(flowDescriptor == null){ + if (LOG.isTraceEnabled()) { + LOG.trace("Failed to retrieve flow descriptor for flow hash : {}, trying with custom equals method", flowRegistryKey.hashCode()); + } + for(Map.Entry fd : flowRegistry.entrySet()) { + if (fd.getKey().equals(flowRegistryKey)) { + flowDescriptor = fd.getValue(); + break; + } + } } + return flowDescriptor; } @Override - public void markToBeremoved(final FlowHash flowHash) { - synchronized (marks) { - marks.add(flowHash); + public void store(final FlowRegistryKey flowRegistryKey, final FlowDescriptor flowDescriptor) { + try { + LOG.trace("Storing flowDescriptor with table ID : {} and flow ID : {} for flow hash : {}", + flowDescriptor.getTableKey().getId(), flowDescriptor.getFlowId().getValue(), flowRegistryKey.hashCode()); + flowRegistry.put(flowRegistryKey, flowDescriptor); + } catch (IllegalArgumentException ex) { + LOG.warn("Flow with flowId {} already exists in table {}", flowDescriptor.getFlowId().getValue(), + flowDescriptor.getTableKey().getId()); + final FlowId newFlowId = createAlienFlowId(flowDescriptor.getTableKey().getId()); + final FlowDescriptor newFlowDescriptor = FlowDescriptorFactory. + create(flowDescriptor.getTableKey().getId(), newFlowId); + flowRegistry.put(flowRegistryKey, newFlowDescriptor); } - LOG.trace("Flow hash {} was marked for removal.", flowHash.hashCode()); + } + @Override + public void update(final FlowRegistryKey newFlowRegistryKey, final FlowDescriptor flowDescriptor) { + LOG.trace("Updating the entry with hash: {}", newFlowRegistryKey.hashCode()); + flowRegistry.forcePut(newFlowRegistryKey, flowDescriptor); } @Override - public void removeMarked() { - synchronized (flowRegistry) { - for (FlowHash flowHash : marks) { - LOG.trace("Removing flowDescriptor for flow hash : {}", flowHash.hashCode()); - flowRegistry.remove(flowHash); - } - } - synchronized (marks) { - marks.clear(); + public FlowId storeIfNecessary(final FlowRegistryKey flowRegistryKey) { + LOG.trace("Trying to retrieve flow ID for flow hash : {}", flowRegistryKey.hashCode()); + + // First, try to get FlowDescriptor from flow registry + FlowDescriptor flowDescriptor = retrieveIdForFlow(flowRegistryKey); + + // We was not able to retrieve FlowDescriptor, so we will at least try to generate it + if (flowDescriptor == null) { + LOG.trace("Flow descriptor for flow hash : {} not found, generating alien flow ID", flowRegistryKey.hashCode()); + final short tableId = flowRegistryKey.getTableId(); + final FlowId alienFlowId = createAlienFlowId(tableId); + flowDescriptor = FlowDescriptorFactory.create(tableId, alienFlowId); + + // Finally we got flowDescriptor, so now we will store it to registry, + // so next time we won't need to generate it again + store(flowRegistryKey, flowDescriptor); } + + return flowDescriptor.getFlowId(); } + @Override + public void removeDescriptor(final FlowRegistryKey flowRegistryKey) { + LOG.trace("Removing flow descriptor for flow hash : {}", flowRegistryKey.hashCode()); + flowRegistry.remove(flowRegistryKey); + } @Override - public Map getAllFlowDescriptors() { - return flowRegistry; + public Map getAllFlowDescriptors() { + return Collections.unmodifiableMap(flowRegistry); } @Override - public void close() throws Exception { + public void close() { + final Iterator>>> iterator = lastFillFutures.iterator(); + + while(iterator.hasNext()) { + final ListenableFuture>> next = iterator.next(); + boolean success = next.cancel(true); + LOG.trace("Cancelling filling flow registry with flows job {} with result: {}", next, success); + iterator.remove(); + } + flowRegistry.clear(); - marks.clear(); + } + + @VisibleForTesting + static FlowId createAlienFlowId(final short tableId) { + final String alienId = ALIEN_SYSTEM_FLOW_ID + tableId + '-' + UNACCOUNTED_FLOWS_COUNTER.incrementAndGet(); + return new FlowId(alienId); } }