Merge "Simplify DeviceFlowRegistryImpl streaming"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / registry / flow / DeviceFlowRegistryImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.registry.flow;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.collect.BiMap;
12 import com.google.common.collect.HashBiMap;
13 import com.google.common.collect.Maps;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.util.ArrayList;
20 import java.util.Arrays;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Objects;
25 import java.util.Optional;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.function.Consumer;
28 import javax.annotation.Nonnull;
29 import javax.annotation.concurrent.ThreadSafe;
30 import org.opendaylight.mdsal.binding.api.DataBroker;
31 import org.opendaylight.mdsal.binding.api.ReadTransaction;
32 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
33 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
34 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
35 import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.general.rev140714.GeneralAugMatchNodesNodeTableFlow;
42 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
43 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 @ThreadSafe
48 public class DeviceFlowRegistryImpl implements DeviceFlowRegistry {
49     private static final Logger LOG = LoggerFactory.getLogger(DeviceFlowRegistryImpl.class);
50     private static final String ALIEN_SYSTEM_FLOW_ID = "#UF$TABLE*";
51     private static final AtomicInteger UNACCOUNTED_FLOWS_COUNTER = new AtomicInteger(0);
52
53     private final BiMap<FlowRegistryKey, FlowDescriptor> flowRegistry = Maps.synchronizedBiMap(HashBiMap.create());
54     private final DataBroker dataBroker;
55     private final KeyedInstanceIdentifier<Node, NodeKey> instanceIdentifier;
56     private final List<ListenableFuture<List<Optional<FlowCapableNode>>>> lastFillFutures = new ArrayList<>();
57     private final Consumer<Flow> flowConsumer;
58
59     public DeviceFlowRegistryImpl(final short version,
60                                   final DataBroker dataBroker,
61                                   final KeyedInstanceIdentifier<Node, NodeKey> instanceIdentifier) {
62         this.dataBroker = dataBroker;
63         this.instanceIdentifier = instanceIdentifier;
64
65         // Specifies what to do with flow read from data store
66         flowConsumer = flow -> {
67             final FlowRegistryKey flowRegistryKey = FlowRegistryKeyFactory.create(version, flow);
68
69             if (getExistingKey(flowRegistryKey) == null) {
70                 // Now, we will update the registry
71                 storeDescriptor(flowRegistryKey, FlowDescriptorFactory.create(flow.getTableId(), flow.getId()));
72             }
73         };
74     }
75
76     @Override
77     public ListenableFuture<List<Optional<FlowCapableNode>>> fill() {
78         if (LOG.isDebugEnabled()) {
79             LOG.debug("Filling flow registry with flows for node: {}", instanceIdentifier.getKey().getId().getValue());
80         }
81
82         // Prepare path for read transaction
83         // TODO: Read only Tables, and not entire FlowCapableNode (fix Yang model)
84         final InstanceIdentifier<FlowCapableNode> path = instanceIdentifier.augmentation(FlowCapableNode.class);
85
86         // First, try to fill registry with flows from DS/Configuration
87         final FluentFuture<Optional<FlowCapableNode>> configFuture =
88                 fillFromDatastore(LogicalDatastoreType.CONFIGURATION, path);
89
90         // Now, try to fill registry with flows from DS/Operational
91         // in case of cluster fail over, when clients are not using DS/Configuration
92         // for adding flows, but only RPCs
93         final FluentFuture<Optional<FlowCapableNode>> operationalFuture =
94                 fillFromDatastore(LogicalDatastoreType.OPERATIONAL, path);
95
96         // And at last, chain and return futures created above.
97         // Also, cache this future, so call to DeviceFlowRegistry.close() will be able
98         // to cancel this future immediately if it will be still in progress
99         final ListenableFuture<List<Optional<FlowCapableNode>>> lastFillFuture =
100                 Futures.allAsList(Arrays.asList(configFuture, operationalFuture));
101         lastFillFutures.add(lastFillFuture);
102         return lastFillFuture;
103     }
104
105     private FluentFuture<Optional<FlowCapableNode>> fillFromDatastore(final LogicalDatastoreType logicalDatastoreType,
106                               final InstanceIdentifier<FlowCapableNode> path) {
107         // Prepare read operation from datastore for path
108         final FluentFuture<Optional<FlowCapableNode>> future;
109         try (ReadTransaction transaction = dataBroker.newReadOnlyTransaction()) {
110             future = transaction.read(logicalDatastoreType, path);
111         }
112
113         future.addCallback(new FutureCallback<Optional<FlowCapableNode>>() {
114             @Override
115             public void onSuccess(Optional<FlowCapableNode> result) {
116                 result.ifPresent(flowCapableNode -> {
117                     flowCapableNode.nonnullTable().stream()
118                     .filter(Objects::nonNull)
119                     .flatMap(table -> table.nonnullFlow().stream())
120                     .filter(Objects::nonNull)
121                     .filter(flow -> flow.getId() != null)
122                     .forEach(flowConsumer);
123                 });
124             }
125
126             @Override
127             public void onFailure(Throwable throwable) {
128                 LOG.debug("Failed to read {} path {}", logicalDatastoreType, path, throwable);
129             }
130         }, MoreExecutors.directExecutor());
131
132         return future;
133     }
134
135     @Override
136     public FlowDescriptor retrieveDescriptor(@Nonnull final FlowRegistryKey flowRegistryKey) {
137         if (LOG.isTraceEnabled()) {
138             LOG.trace("Retrieving flow descriptor for flow registry : {}", flowRegistryKey.toString());
139         }
140
141         FlowRegistryKey existingFlowRegistryKey = getExistingKey(flowRegistryKey);
142         if (existingFlowRegistryKey != null) {
143             return flowRegistry.get(existingFlowRegistryKey);
144         }
145         return null;
146     }
147
148     @Override
149     public void storeDescriptor(@Nonnull final FlowRegistryKey flowRegistryKey,
150                                 @Nonnull final FlowDescriptor flowDescriptor) {
151         try {
152             if (LOG.isTraceEnabled()) {
153                 LOG.trace("Storing flowDescriptor with table ID : {} and flow ID : {} for flow hash : {}",
154                         flowDescriptor.getTableKey().getId(),
155                         flowDescriptor.getFlowId().getValue(),
156                         flowRegistryKey.toString());
157             }
158
159             addToFlowRegistry(flowRegistryKey, flowDescriptor);
160         } catch (IllegalArgumentException ex) {
161             if (LOG.isWarnEnabled()) {
162                 LOG.warn("Flow with flow ID {} already exists in table {}, generating alien flow ID",
163                         flowDescriptor.getFlowId().getValue(),
164                         flowDescriptor.getTableKey().getId());
165             }
166
167             // We are trying to store new flow to flow registry, but we already have different flow with same flow ID
168             // stored in registry, so we need to create alien ID for this new flow here.
169             addToFlowRegistry(
170                     flowRegistryKey,
171                     FlowDescriptorFactory.create(
172                             flowDescriptor.getTableKey().getId(),
173                             createAlienFlowId(flowDescriptor.getTableKey().getId())));
174         }
175     }
176
177     @Override
178     public void store(final FlowRegistryKey flowRegistryKey) {
179         if (retrieveDescriptor(flowRegistryKey) == null) {
180             LOG.debug("Flow descriptor for flow hash : {} not found, generating alien flow ID", flowRegistryKey);
181
182             // We do not found flow in flow registry, that means it do not have any ID already assigned, so we need
183             // to generate new alien flow ID here.
184             storeDescriptor(
185                     flowRegistryKey,
186                     FlowDescriptorFactory.create(
187                             flowRegistryKey.getTableId(),
188                             createAlienFlowId(flowRegistryKey.getTableId())));
189         }
190     }
191
192     @Override
193     public void addMark(final FlowRegistryKey flowRegistryKey) {
194         if (LOG.isTraceEnabled()) {
195             LOG.trace("Removing flow descriptor for flow hash : {}", flowRegistryKey.toString());
196         }
197
198         removeFromFlowRegistry(flowRegistryKey);
199     }
200
201     @Override
202     public void processMarks() {
203         // Do nothing
204     }
205
206     @Override
207     public void forEach(final Consumer<FlowRegistryKey> consumer) {
208         synchronized (flowRegistry) {
209             flowRegistry.keySet().forEach(consumer);
210         }
211     }
212
213     @Override
214     public int size() {
215         return flowRegistry.size();
216     }
217
218     @Override
219     public void close() {
220         final Iterator<ListenableFuture<List<Optional<FlowCapableNode>>>> iterator = lastFillFutures.iterator();
221
222         // We need to force interrupt and clear all running futures that are trying to read flow IDs from data store
223         while (iterator.hasNext()) {
224             final ListenableFuture<List<Optional<FlowCapableNode>>> next = iterator.next();
225             boolean success = next.cancel(true);
226             LOG.trace("Cancelling filling flow registry with flows job {} with result: {}", next, success);
227             iterator.remove();
228         }
229
230         flowRegistry.clear();
231     }
232
233     @VisibleForTesting
234     static FlowId createAlienFlowId(final short tableId) {
235         final String alienId = ALIEN_SYSTEM_FLOW_ID + tableId + '-' + UNACCOUNTED_FLOWS_COUNTER.incrementAndGet();
236         LOG.debug("Created alien flow id {} for table id {}", alienId, tableId);
237         return new FlowId(alienId);
238     }
239
240     //Hashcode generation of the extension augmentation can differ for the same object received from the datastore and
241     // the one received after deserialization of switch message. OpenFlowplugin extensions are list, and the order in
242     // which it can receive the extensions back from switch can differ and that lead to a different hashcode. In that
243     // scenario, hashcode won't match and flowRegistry return the  related key. To overcome this issue, these methods
244     // make sure that key is stored only if it doesn't equals to any existing key.
245     private void addToFlowRegistry(final FlowRegistryKey flowRegistryKey, final FlowDescriptor flowDescriptor) {
246         FlowRegistryKey existingFlowRegistryKey = getExistingKey(flowRegistryKey);
247         if (existingFlowRegistryKey == null) {
248             flowRegistry.put(flowRegistryKey, flowDescriptor);
249         } else {
250             flowRegistry.put(existingFlowRegistryKey, flowDescriptor);
251         }
252     }
253
254     private void removeFromFlowRegistry(final FlowRegistryKey flowRegistryKey) {
255         FlowRegistryKey existingFlowRegistryKey = getExistingKey(flowRegistryKey);
256         if (existingFlowRegistryKey != null) {
257             flowRegistry.remove(existingFlowRegistryKey);
258         } else {
259             flowRegistry.remove(flowRegistryKey);
260         }
261     }
262
263     private FlowRegistryKey getExistingKey(final FlowRegistryKey flowRegistryKey) {
264         if (flowRegistryKey.getMatch().augmentation(GeneralAugMatchNodesNodeTableFlow.class) == null) {
265             if (flowRegistry.containsKey(flowRegistryKey)) {
266                 return flowRegistryKey;
267             }
268         } else {
269             synchronized (flowRegistry) {
270                 for (Map.Entry<FlowRegistryKey, FlowDescriptor> keyValueSet : flowRegistry.entrySet()) {
271                     if (keyValueSet.getKey().equals(flowRegistryKey)) {
272                         return keyValueSet.getKey();
273                     }
274                 }
275             }
276         }
277         return null;
278     }
279
280     @VisibleForTesting
281     Map<FlowRegistryKey, FlowDescriptor> getAllFlowDescriptors() {
282         return flowRegistry;
283     }
284 }