2 * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.frm.impl;
10 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.buildGroupInstanceIdentifier;
11 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getActiveBundle;
12 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getFlowId;
13 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdFromNodeIdentifier;
14 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isFlowDependentOnGroup;
15 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isGroupExistsOnDevice;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Preconditions;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.Future;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
28 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
29 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
31 import org.opendaylight.infrautils.utils.concurrent.JdkFutures;
32 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
33 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlowBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupRef;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
64 import org.opendaylight.yangtools.concepts.ListenerRegistration;
65 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
66 import org.opendaylight.yangtools.yang.common.RpcError;
67 import org.opendaylight.yangtools.yang.common.RpcResult;
68 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
73 * FlowForwarder It implements
74 * {@link org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener}
75 * for WildCardedPath to {@link Flow} and ForwardingRulesCommiter interface for
76 * methods: add, update and remove {@link Flow} processing for
77 * {@link org.opendaylight.controller.md.sal.binding.api.DataTreeModification}.
79 public class FlowForwarder extends AbstractListeningCommiter<Flow> {
81 private static final Logger LOG = LoggerFactory.getLogger(FlowForwarder.class);
83 private static final String GROUP_EXISTS_IN_DEVICE_ERROR = "GROUPEXISTS";
85 private ListenerRegistration<FlowForwarder> listenerRegistration;
86 private final BundleFlowForwarder bundleFlowForwarder;
88 public FlowForwarder(final ForwardingRulesManager manager, final DataBroker db) {
90 bundleFlowForwarder = new BundleFlowForwarder(manager);
94 @SuppressWarnings("IllegalCatch")
95 public void registerListener() {
96 final DataTreeIdentifier<Flow> treeId = new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
99 listenerRegistration = dataBroker.registerDataTreeChangeListener(treeId, FlowForwarder.this);
100 } catch (final Exception e) {
101 LOG.warn("FRM Flow DataTreeChange listener registration fail!");
102 LOG.debug("FRM Flow DataTreeChange listener registration fail ..", e);
103 throw new IllegalStateException("FlowForwarder startup fail! System needs restart.", e);
109 public void deregisterListener() {
114 public void close() {
115 if (listenerRegistration != null) {
116 listenerRegistration.close();
117 listenerRegistration = null;
122 public void remove(final InstanceIdentifier<Flow> identifier, final Flow removeDataObj,
123 final InstanceIdentifier<FlowCapableNode> nodeIdent) {
125 final TableKey tableKey = identifier.firstKeyOf(Table.class);
126 if (tableIdValidationPrecondition(tableKey, removeDataObj)) {
127 BundleId bundleId = getActiveBundle(nodeIdent, provider);
128 if (bundleId != null) {
129 bundleFlowForwarder.remove(identifier, removeDataObj, nodeIdent, bundleId);
131 final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj);
132 builder.setFlowRef(new FlowRef(identifier));
133 builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
134 builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
136 // This method is called only when a given flow object has been
137 // removed from datastore. So FRM always needs to set strict flag
138 // into remove-flow input so that only a flow entry associated with
139 // a given flow object is removed.
140 builder.setTransactionUri(new Uri(provider.getNewTransactionId())).setStrict(Boolean.TRUE);
141 final Future<RpcResult<RemoveFlowOutput>> resultFuture =
142 provider.getSalFlowService().removeFlow(builder.build());
143 JdkFutures.addErrorLogging(resultFuture, LOG, "removeFlow");
148 // TODO: Pull this into ForwardingRulesCommiter and override it here
151 public Future<RpcResult<RemoveFlowOutput>> removeWithResult(final InstanceIdentifier<Flow> identifier,
152 final Flow removeDataObj, final InstanceIdentifier<FlowCapableNode> nodeIdent) {
154 Future<RpcResult<RemoveFlowOutput>> resultFuture = SettableFuture.create();
155 final TableKey tableKey = identifier.firstKeyOf(Table.class);
156 if (tableIdValidationPrecondition(tableKey, removeDataObj)) {
157 final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj);
158 builder.setFlowRef(new FlowRef(identifier));
159 builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
160 builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
162 // This method is called only when a given flow object has been
163 // removed from datastore. So FRM always needs to set strict flag
164 // into remove-flow input so that only a flow entry associated with
165 // a given flow object is removed.
166 builder.setTransactionUri(new Uri(provider.getNewTransactionId())).setStrict(Boolean.TRUE);
167 resultFuture = provider.getSalFlowService().removeFlow(builder.build());
174 public void update(final InstanceIdentifier<Flow> identifier, final Flow original, final Flow update,
175 final InstanceIdentifier<FlowCapableNode> nodeIdent) {
177 final TableKey tableKey = identifier.firstKeyOf(Table.class);
178 if (tableIdValidationPrecondition(tableKey, update)) {
179 BundleId bundleId = getActiveBundle(nodeIdent, provider);
180 if (bundleId != null) {
181 bundleFlowForwarder.update(identifier, original, update, nodeIdent, bundleId);
183 final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
184 nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
185 final UpdateFlowInputBuilder builder = new UpdateFlowInputBuilder();
186 builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
187 builder.setFlowRef(new FlowRef(identifier));
188 builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
190 // This method is called only when a given flow object in datastore
191 // has been updated. So FRM always needs to set strict flag into
192 // update-flow input so that only a flow entry associated with
193 // a given flow object is updated.
194 builder.setUpdatedFlow(new UpdatedFlowBuilder(update).setStrict(Boolean.TRUE).build());
195 builder.setOriginalFlow(new OriginalFlowBuilder(original).setStrict(Boolean.TRUE).build());
197 Long groupId = isFlowDependentOnGroup(update);
198 if (groupId != null) {
199 LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
200 getFlowId(new FlowRef(identifier)), groupId);
201 if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) {
202 LOG.trace("The dependent group {} is already programmed. Updating the flow {}", groupId,
203 getFlowId(new FlowRef(identifier)));
204 return provider.getSalFlowService().updateFlow(builder.build());
206 LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
207 ListenableFuture<RpcResult<AddGroupOutput>> groupFuture = pushDependentGroup(nodeIdent,
209 SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture = SettableFuture.create();
210 Futures.addCallback(groupFuture,
211 new UpdateFlowCallBack(builder.build(), nodeId, resultFuture, groupId),
212 MoreExecutors.directExecutor());
217 LOG.trace("The flow {} is not dependent on any group. Updating the flow",
218 getFlowId(new FlowRef(identifier)));
219 return provider.getSalFlowService().updateFlow(builder.build());
226 public Future<? extends RpcResult<?>> add(final InstanceIdentifier<Flow> identifier, final Flow addDataObj,
227 final InstanceIdentifier<FlowCapableNode> nodeIdent) {
229 final TableKey tableKey = identifier.firstKeyOf(Table.class);
230 if (tableIdValidationPrecondition(tableKey, addDataObj)) {
231 BundleId bundleId = getActiveBundle(nodeIdent, provider);
232 if (bundleId != null) {
233 return bundleFlowForwarder.add(identifier, addDataObj, nodeIdent, bundleId);
235 final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
236 nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
237 final AddFlowInputBuilder builder = new AddFlowInputBuilder(addDataObj);
239 builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
240 builder.setFlowRef(new FlowRef(identifier));
241 builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
242 builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
243 Long groupId = isFlowDependentOnGroup(addDataObj);
244 if (groupId != null) {
245 LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
246 getFlowId(new FlowRef(identifier)), groupId);
247 if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) {
248 LOG.trace("The dependent group {} is already programmed. Adding the flow {}", groupId,
249 getFlowId(new FlowRef(identifier)));
250 return provider.getSalFlowService().addFlow(builder.build());
252 LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
253 ListenableFuture<RpcResult<AddGroupOutput>> groupFuture = pushDependentGroup(nodeIdent,
255 SettableFuture<RpcResult<AddFlowOutput>> resultFuture = SettableFuture.create();
256 Futures.addCallback(groupFuture, new AddFlowCallBack(builder.build(), nodeId, groupId,
257 resultFuture), MoreExecutors.directExecutor());
262 LOG.trace("The flow {} is not dependent on any group. Adding the flow",
263 getFlowId(new FlowRef(identifier)));
264 return provider.getSalFlowService().addFlow(builder.build());
268 return Futures.immediateFuture(null);
272 public void createStaleMarkEntity(InstanceIdentifier<Flow> identifier, Flow del,
273 InstanceIdentifier<FlowCapableNode> nodeIdent) {
274 LOG.debug("Creating Stale-Mark entry for the switch {} for flow {} ", nodeIdent.toString(), del.toString());
275 StaleFlow staleFlow = makeStaleFlow(identifier, del, nodeIdent);
276 persistStaleFlow(staleFlow, nodeIdent);
280 protected InstanceIdentifier<Flow> getWildCardPath() {
281 return InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class)
282 .child(Table.class).child(Flow.class);
285 private static boolean tableIdValidationPrecondition(final TableKey tableKey, final Flow flow) {
286 Preconditions.checkNotNull(tableKey, "TableKey can not be null or empty!");
287 Preconditions.checkNotNull(flow, "Flow can not be null or empty!");
288 if (!tableKey.getId().equals(flow.getTableId())) {
289 LOG.warn("TableID in URI tableId={} and in palyload tableId={} is not same.", flow.getTableId(),
296 private StaleFlow makeStaleFlow(InstanceIdentifier<Flow> identifier, Flow del,
297 InstanceIdentifier<FlowCapableNode> nodeIdent) {
298 StaleFlowBuilder staleFlowBuilder = new StaleFlowBuilder(del);
299 return staleFlowBuilder.setId(del.getId()).build();
302 private void persistStaleFlow(StaleFlow staleFlow, InstanceIdentifier<FlowCapableNode> nodeIdent) {
303 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
304 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getStaleFlowInstanceIdentifier(staleFlow, nodeIdent),
307 ListenableFuture<Void> submitFuture = writeTransaction.submit();
308 handleStaleFlowResultFuture(submitFuture);
311 private void handleStaleFlowResultFuture(ListenableFuture<Void> submitFuture) {
312 Futures.addCallback(submitFuture, new FutureCallback<Void>() {
314 public void onSuccess(Void result) {
315 LOG.debug("Stale Flow creation success");
319 public void onFailure(Throwable throwable) {
320 LOG.error("Stale Flow creation failed {}", throwable);
322 }, MoreExecutors.directExecutor());
326 private InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight
327 .flow.inventory.rev130819.tables.table.StaleFlow> getStaleFlowInstanceIdentifier(
328 StaleFlow staleFlow, InstanceIdentifier<FlowCapableNode> nodeIdent) {
329 return nodeIdent.child(Table.class, new TableKey(staleFlow.getTableId())).child(
330 org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow.class,
331 new StaleFlowKey(new FlowId(staleFlow.getId())));
334 private ListenableFuture<RpcResult<AddGroupOutput>> pushDependentGroup(
335 final InstanceIdentifier<FlowCapableNode> nodeIdent, final Long groupId) {
337 //TODO This read to the DS might have a performance impact.
338 //if the dependent group is not installed than we should just cache the parent group,
339 //till we receive the dependent group DTCN and then push it.
341 InstanceIdentifier<Group> groupIdent = buildGroupInstanceIdentifier(nodeIdent, groupId);
342 ListenableFuture<RpcResult<AddGroupOutput>> resultFuture;
343 LOG.info("Reading the group from config inventory: {}", groupId);
344 try (ReadOnlyTransaction readTransaction = provider.getReadTransaction()) {
345 Optional<Group> group = readTransaction.read(LogicalDatastoreType.CONFIGURATION, groupIdent).get();
346 if (group.isPresent()) {
347 final AddGroupInputBuilder builder = new AddGroupInputBuilder(group.get());
348 builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
349 builder.setGroupRef(new GroupRef(nodeIdent));
350 builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
351 AddGroupInput addGroupInput = builder.build();
352 resultFuture = this.provider.getSalGroupService().addGroup(addGroupInput);
354 resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddGroupOutput>failed()
355 .withError(RpcError.ErrorType.APPLICATION,
356 "Group " + groupId + " not present in the config inventory").build());
358 } catch (InterruptedException | ExecutionException e) {
359 LOG.error("Error while reading group from config datastore for the group ID {}", groupId, e);
360 resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddGroupOutput>failed()
361 .withError(RpcError.ErrorType.APPLICATION,
362 "Error while reading group " + groupId + " from inventory").build());
367 private final class AddFlowCallBack implements FutureCallback<RpcResult<AddGroupOutput>> {
368 private final AddFlowInput addFlowInput;
369 private final NodeId nodeId;
370 private final Long groupId;
371 private final SettableFuture<RpcResult<AddFlowOutput>> resultFuture;
373 private AddFlowCallBack(final AddFlowInput addFlowInput, final NodeId nodeId, Long groupId,
374 SettableFuture<RpcResult<AddFlowOutput>> resultFuture) {
375 this.addFlowInput = addFlowInput;
376 this.nodeId = nodeId;
377 this.groupId = groupId;
378 this.resultFuture = resultFuture;
382 public void onSuccess(RpcResult<AddGroupOutput> rpcResult) {
383 if (rpcResult.isSuccessful() || rpcResult.getErrors().size() == 1
384 && rpcResult.getErrors().iterator().next().getMessage().contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
385 provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
386 Futures.addCallback(provider.getSalFlowService().addFlow(addFlowInput),
387 new FutureCallback<RpcResult<AddFlowOutput>>() {
389 public void onSuccess(RpcResult<AddFlowOutput> result) {
390 resultFuture.set(result);
394 public void onFailure(Throwable failure) {
395 resultFuture.setException(failure);
397 }, MoreExecutors.directExecutor());
399 LOG.debug("Flow add with id {} finished without error for node {}",
400 getFlowId(addFlowInput.getFlowRef()), nodeId);
402 LOG.error("Flow add with id {} failed for node {} with error {}", getFlowId(addFlowInput.getFlowRef()),
403 nodeId, rpcResult.getErrors().toString());
404 resultFuture.set(RpcResultBuilder.<AddFlowOutput>failed()
405 .withRpcErrors(rpcResult.getErrors()).build());
410 public void onFailure(Throwable throwable) {
411 LOG.error("Service call for adding flow with id {} failed for node {}",
412 getFlowId(addFlowInput.getFlowRef()), nodeId, throwable);
413 resultFuture.setException(throwable);
417 private final class UpdateFlowCallBack implements FutureCallback<RpcResult<AddGroupOutput>> {
418 private final UpdateFlowInput updateFlowInput;
419 private final NodeId nodeId;
420 private final Long groupId;
421 private final SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture;
423 private UpdateFlowCallBack(final UpdateFlowInput updateFlowInput, final NodeId nodeId,
424 SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture, Long groupId) {
425 this.updateFlowInput = updateFlowInput;
426 this.nodeId = nodeId;
427 this.groupId = groupId;
428 this.resultFuture = resultFuture;
432 public void onSuccess(RpcResult<AddGroupOutput> rpcResult) {
433 if (rpcResult.isSuccessful() || rpcResult.getErrors().size() == 1
434 && rpcResult.getErrors().iterator().next().getMessage().contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
435 provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
436 Futures.addCallback(provider.getSalFlowService().updateFlow(updateFlowInput),
437 new FutureCallback<RpcResult<UpdateFlowOutput>>() {
439 public void onSuccess(RpcResult<UpdateFlowOutput> result) {
440 resultFuture.set(result);
444 public void onFailure(Throwable failure) {
445 resultFuture.setException(failure);
447 }, MoreExecutors.directExecutor());
449 LOG.debug("Flow update with id {} finished without error for node {}",
450 getFlowId(updateFlowInput.getFlowRef()), nodeId);
452 LOG.error("Flow update with id {} failed for node {} with error {}",
453 getFlowId(updateFlowInput.getFlowRef()), nodeId, rpcResult.getErrors().toString());
454 resultFuture.set(RpcResultBuilder.<UpdateFlowOutput>failed()
455 .withRpcErrors(rpcResult.getErrors()).build());
460 public void onFailure(Throwable throwable) {
461 LOG.error("Service call for updating flow with id {} failed for node {}",
462 getFlowId(updateFlowInput.getFlowRef()), nodeId, throwable);
463 resultFuture.setException(throwable);