2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.registry.flow;
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.romix.scala.collection.concurrent.TrieMap;
13 import java.util.Collection;
14 import java.util.Collections;
15 import java.util.HashSet;
16 import java.util.List;
18 import java.util.concurrent.ConcurrentMap;
19 import java.util.concurrent.ExecutionException;
20 import javax.annotation.concurrent.GuardedBy;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
25 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
26 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
27 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
28 import org.opendaylight.openflowplugin.impl.util.FlowUtil;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
35 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
36 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * Created by Martin Bobak <mbobak@cisco.com> on 8.4.2015.
43 public class DeviceFlowRegistryImpl implements DeviceFlowRegistry {
44 private static final Logger LOG = LoggerFactory.getLogger(DeviceFlowRegistryImpl.class);
46 private final ConcurrentMap<FlowRegistryKey, FlowDescriptor> flowRegistry = new TrieMap<>();
48 private final Collection<FlowRegistryKey> marks = new HashSet<>();
49 private final DataBroker dataBroker;
51 public DeviceFlowRegistryImpl(final DataBroker dataBroker) {
52 this.dataBroker = dataBroker;
56 public void fill(final KeyedInstanceIdentifier<Node, NodeKey> instanceIdentifier) {
57 LOG.trace("Filling flow registry with flows for node: {}", instanceIdentifier);
59 // Prepare path for read transaction
60 // TODO: Read only Tables, and not entire FlowCapableNode (fix Yang model)
61 final InstanceIdentifier<FlowCapableNode> path = instanceIdentifier.augmentation(FlowCapableNode.class);
63 // First, try to fill registry with flows from DS/Configuration
64 fillFromDatastore(LogicalDatastoreType.CONFIGURATION, path);
66 // Now, try to fill registry with flows from DS/Operational
67 // in case of cluster fail over, when clients are not using DS/Configuration
68 // for adding flows, but only RPCs
69 fillFromDatastore(LogicalDatastoreType.OPERATIONAL, path);
72 private void fillFromDatastore(final LogicalDatastoreType logicalDatastoreType, final InstanceIdentifier<FlowCapableNode> path) {
73 // Create new read-only transaction
74 final ReadOnlyTransaction transaction = dataBroker.newReadOnlyTransaction();
76 // Bail out early if transaction is null
77 if (transaction == null) {
81 // Prepare read operation from datastore for path
82 final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> future =
83 transaction.read(logicalDatastoreType, path);
85 // Bail out early if future is null
91 // Synchronously read all data in path
92 final Optional<FlowCapableNode> data = future.get();
94 if (data.isPresent()) {
95 final List<Table> tables = data.get().getTable();
98 for (Table table : tables) {
99 final List<Flow> flows = table.getFlow();
102 // If we finally got some flows, store each of them in registry if needed
103 for (Flow flow : table.getFlow()) {
104 final FlowRegistryKey key = FlowRegistryKeyFactory.create(flow);
106 // Now, we will update the registry, but we will also try to prevent duplicate entries
107 if (!flowRegistry.containsKey(key)) {
108 LOG.trace("Reading and storing flowDescriptor with table ID : {} and flow ID : {}",
110 flow.getId().getValue());
112 final FlowDescriptor descriptor = FlowDescriptorFactory.create(
116 flowRegistry.put(key, descriptor);
123 } catch (InterruptedException | ExecutionException e) {
124 LOG.error("Read transaction for identifier {} failed with exception: {}", path, e);
127 // After we are done with reading from datastore, close the transaction
132 public FlowDescriptor retrieveIdForFlow(final FlowRegistryKey flowRegistryKey) {
133 LOG.trace("Trying to retrieve flowDescriptor for flow hash: {}", flowRegistryKey.hashCode());
135 // First, try to get FlowDescriptor from flow registry
136 FlowDescriptor flowDescriptor = flowRegistry.get(flowRegistryKey);
138 // We was not able to retrieve FlowDescriptor, so we will at least try to generate it
139 if (flowDescriptor == null) {
140 final short tableId = flowRegistryKey.getTableId();
141 final FlowId alienFlowId = FlowUtil.createAlienFlowId(tableId);
142 flowDescriptor = FlowDescriptorFactory.create(tableId, alienFlowId);
144 // Finally we got flowDescriptor, so now we will store it to registry,
145 // so next time we won't need to generate it again
146 store(flowRegistryKey, flowDescriptor);
149 return flowDescriptor;
153 public void store(final FlowRegistryKey flowRegistryKey, final FlowDescriptor flowDescriptor) {
154 LOG.trace("Storing flowDescriptor with table ID : {} and flow ID : {} for flow hash : {}", flowDescriptor.getTableKey().getId(), flowDescriptor.getFlowId().getValue(), flowRegistryKey.hashCode());
155 flowRegistry.put(flowRegistryKey, flowDescriptor);
159 public FlowId storeIfNecessary(final FlowRegistryKey flowRegistryKey) {
160 // We will simply reuse retrieveIdForFlow to get or generate FlowDescriptor and store it if needed
161 final FlowDescriptor flowDescriptor = retrieveIdForFlow(flowRegistryKey);
162 return flowDescriptor.getFlowId();
166 public void markToBeremoved(final FlowRegistryKey flowRegistryKey) {
167 synchronized (marks) {
168 marks.add(flowRegistryKey);
170 LOG.trace("Flow hash {} was marked for removal.", flowRegistryKey.hashCode());
174 public void removeMarked() {
175 synchronized (marks) {
176 for (FlowRegistryKey flowRegistryKey : marks) {
177 LOG.trace("Removing flowDescriptor for flow hash : {}", flowRegistryKey.hashCode());
178 flowRegistry.remove(flowRegistryKey);
186 public Map<FlowRegistryKey, FlowDescriptor> getAllFlowDescriptors() {
187 return Collections.unmodifiableMap(flowRegistry);
191 public void close() {
192 flowRegistry.clear();