Merge "Bug 5936 - DeviceFlowRegistry flowId bug"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / registry / flow / DeviceFlowRegistryImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.registry.flow;
9
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.romix.scala.collection.concurrent.TrieMap;
13 import java.util.Collection;
14 import java.util.Collections;
15 import java.util.HashSet;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.concurrent.ConcurrentMap;
19 import java.util.concurrent.ExecutionException;
20 import javax.annotation.concurrent.GuardedBy;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
25 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
26 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
27 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
28 import org.opendaylight.openflowplugin.impl.util.FlowUtil;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
35 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
36 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * Created by Martin Bobak <mbobak@cisco.com> on 8.4.2015.
42  */
43 public class DeviceFlowRegistryImpl implements DeviceFlowRegistry {
44     private static final Logger LOG = LoggerFactory.getLogger(DeviceFlowRegistryImpl.class);
45
46     private final ConcurrentMap<FlowRegistryKey, FlowDescriptor> flowRegistry = new TrieMap<>();
47     @GuardedBy("marks")
48     private final Collection<FlowRegistryKey> marks = new HashSet<>();
49     private final DataBroker dataBroker;
50
51     public DeviceFlowRegistryImpl(final DataBroker dataBroker) {
52         this.dataBroker = dataBroker;
53     }
54
55     @Override
56     public void fill(final KeyedInstanceIdentifier<Node, NodeKey> instanceIdentifier) {
57         LOG.trace("Filling flow registry with flows for node: {}", instanceIdentifier);
58
59         // Prepare path for read transaction
60         // TODO: Read only Tables, and not entire FlowCapableNode (fix Yang model)
61         final InstanceIdentifier<FlowCapableNode> path = instanceIdentifier.augmentation(FlowCapableNode.class);
62
63         // First, try to fill registry with flows from DS/Configuration
64         fillFromDatastore(LogicalDatastoreType.CONFIGURATION, path);
65
66         // Now, try to fill registry with flows from DS/Operational
67         // in case of cluster fail over, when clients are not using DS/Configuration
68         // for adding flows, but only RPCs
69         fillFromDatastore(LogicalDatastoreType.OPERATIONAL, path);
70     }
71
72     private void fillFromDatastore(final LogicalDatastoreType logicalDatastoreType, final InstanceIdentifier<FlowCapableNode> path) {
73         // Create new read-only transaction
74         final ReadOnlyTransaction transaction = dataBroker.newReadOnlyTransaction();
75
76         // Bail out early if transaction is null
77         if (transaction == null) {
78             return;
79         }
80
81         // Prepare read operation from datastore for path
82         final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> future =
83                 transaction.read(logicalDatastoreType, path);
84
85         // Bail out early if future is null
86         if (future == null) {
87             return;
88         }
89
90         try {
91             // Synchronously read all data in path
92             final Optional<FlowCapableNode> data = future.get();
93
94             if (data.isPresent()) {
95                 final List<Table> tables = data.get().getTable();
96
97                 if (tables != null) {
98                     for (Table table : tables) {
99                         final List<Flow> flows = table.getFlow();
100
101                         if (flows != null) {
102                             // If we finally got some flows, store each of them in registry if needed
103                             for (Flow flow : table.getFlow()) {
104                                 final FlowRegistryKey key = FlowRegistryKeyFactory.create(flow);
105
106                                 // Now, we will update the registry, but we will also try to prevent duplicate entries
107                                 if (!flowRegistry.containsKey(key)) {
108                                     LOG.trace("Reading and storing flowDescriptor with table ID : {} and flow ID : {}",
109                                             flow.getTableId(),
110                                             flow.getId().getValue());
111
112                                     final FlowDescriptor descriptor = FlowDescriptorFactory.create(
113                                             flow.getTableId(),
114                                             flow.getId());
115
116                                     flowRegistry.put(key, descriptor);
117                                 }
118                             }
119                         }
120                     }
121                 }
122             }
123         } catch (InterruptedException | ExecutionException e) {
124             LOG.error("Read transaction for identifier {} failed with exception: {}", path, e);
125         }
126
127         // After we are done with reading from datastore, close the transaction
128         transaction.close();
129     }
130
131     @Override
132     public FlowDescriptor retrieveIdForFlow(final FlowRegistryKey flowRegistryKey) {
133         LOG.trace("Trying to retrieve flowDescriptor for flow hash: {}", flowRegistryKey.hashCode());
134
135         // First, try to get FlowDescriptor from flow registry
136         FlowDescriptor flowDescriptor = flowRegistry.get(flowRegistryKey);
137
138         // We was not able to retrieve FlowDescriptor, so we will at least try to generate it
139         if (flowDescriptor == null) {
140             final short tableId = flowRegistryKey.getTableId();
141             final FlowId alienFlowId = FlowUtil.createAlienFlowId(tableId);
142             flowDescriptor = FlowDescriptorFactory.create(tableId, alienFlowId);
143
144             // Finally we got flowDescriptor, so now we will store it to registry,
145             // so next time we won't need to generate it again
146             store(flowRegistryKey, flowDescriptor);
147         }
148
149         return flowDescriptor;
150     }
151
152     @Override
153     public void store(final FlowRegistryKey flowRegistryKey, final FlowDescriptor flowDescriptor) {
154         LOG.trace("Storing flowDescriptor with table ID : {} and flow ID : {} for flow hash : {}", flowDescriptor.getTableKey().getId(), flowDescriptor.getFlowId().getValue(), flowRegistryKey.hashCode());
155         flowRegistry.put(flowRegistryKey, flowDescriptor);
156     }
157
158     @Override
159     public FlowId storeIfNecessary(final FlowRegistryKey flowRegistryKey) {
160         // We will simply reuse retrieveIdForFlow to get or generate FlowDescriptor and store it if needed
161         final FlowDescriptor flowDescriptor = retrieveIdForFlow(flowRegistryKey);
162         return flowDescriptor.getFlowId();
163     }
164
165     @Override
166     public void markToBeremoved(final FlowRegistryKey flowRegistryKey) {
167         synchronized (marks) {
168             marks.add(flowRegistryKey);
169         }
170         LOG.trace("Flow hash {} was marked for removal.", flowRegistryKey.hashCode());
171     }
172
173     @Override
174     public void removeMarked() {
175         synchronized (marks) {
176             for (FlowRegistryKey flowRegistryKey : marks) {
177                 LOG.trace("Removing flowDescriptor for flow hash : {}", flowRegistryKey.hashCode());
178                 flowRegistry.remove(flowRegistryKey);
179             }
180
181             marks.clear();
182         }
183     }
184
185     @Override
186     public Map<FlowRegistryKey, FlowDescriptor> getAllFlowDescriptors() {
187         return Collections.unmodifiableMap(flowRegistry);
188     }
189
190     @Override
191     public void close() {
192         flowRegistry.clear();
193         marks.clear();
194     }
195 }