2 * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.frm.impl;
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.buildGroupInstanceIdentifier;
12 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getActiveBundle;
13 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getFlowId;
14 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdValueFromNodeIdentifier;
15 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isFlowDependentOnGroup;
16 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isGroupExistsOnDevice;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import com.google.common.util.concurrent.SettableFuture;
23 import java.util.Optional;
24 import java.util.concurrent.ExecutionException;
25 import org.opendaylight.infrautils.utils.concurrent.LoggingFutures;
26 import org.opendaylight.mdsal.binding.api.DataBroker;
27 import org.opendaylight.mdsal.binding.api.ReadTransaction;
28 import org.opendaylight.mdsal.binding.api.WriteTransaction;
29 import org.opendaylight.mdsal.common.api.CommitInfo;
30 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
31 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
32 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlowBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupRef;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
62 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
63 import org.opendaylight.yangtools.yang.common.ErrorType;
64 import org.opendaylight.yangtools.yang.common.RpcResult;
65 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
66 import org.opendaylight.yangtools.yang.common.Uint32;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
71 * FlowForwarder It implements
72 * {@link org.opendaylight.mdsal.binding.api.DataTreeChangeListener}
73 * for WildCardedPath to {@link Flow} and ForwardingRulesCommiter interface for
74 * methods: add, update and remove {@link Flow} processing for
75 * {@link org.opendaylight.mdsal.binding.api.DataTreeModification}.
77 public class FlowForwarder extends AbstractListeningCommiter<Flow> {
78 private static final Logger LOG = LoggerFactory.getLogger(FlowForwarder.class);
79 private static final String GROUP_EXISTS_IN_DEVICE_ERROR = "GROUPEXISTS";
81 public FlowForwarder(final ForwardingRulesManager manager, final DataBroker db,
82 final ListenerRegistrationHelper registrationHelper) {
83 super(manager, db, registrationHelper);
87 public void remove(final InstanceIdentifier<Flow> identifier, final Flow removeDataObj,
88 final InstanceIdentifier<FlowCapableNode> nodeIdent) {
90 final TableKey tableKey = identifier.firstKeyOf(Table.class);
91 if (tableIdValidationPrecondition(tableKey, removeDataObj)) {
92 BundleId bundleId = getActiveBundle(nodeIdent, provider);
93 if (bundleId != null) {
94 provider.getBundleFlowListener().remove(identifier, removeDataObj, nodeIdent, bundleId);
96 final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
97 nodeConfigurator.enqueueJob(nodeId, () -> {
98 final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj);
99 builder.setFlowRef(new FlowRef(identifier));
100 builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
101 builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
103 // This method is called only when a given flow object has been
104 // removed from datastore. So FRM always needs to set strict flag
105 // into remove-flow input so that only a flow entry associated with
106 // a given flow object is removed.
107 builder.setTransactionUri(new Uri(provider.getNewTransactionId())).setStrict(Boolean.TRUE);
108 final ListenableFuture<RpcResult<RemoveFlowOutput>> resultFuture =
109 provider.getSalFlowService().removeFlow(builder.build());
110 LoggingFutures.addErrorLogging(resultFuture, LOG, "removeFlow");
117 // TODO: Pull this into ForwardingRulesCommiter and override it here
120 public ListenableFuture<RpcResult<RemoveFlowOutput>> removeWithResult(final InstanceIdentifier<Flow> identifier,
121 final Flow removeDataObj, final InstanceIdentifier<FlowCapableNode> nodeIdent) {
123 ListenableFuture<RpcResult<RemoveFlowOutput>> resultFuture = SettableFuture.create();
124 final TableKey tableKey = identifier.firstKeyOf(Table.class);
125 if (tableIdValidationPrecondition(tableKey, removeDataObj)) {
126 final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj);
127 builder.setFlowRef(new FlowRef(identifier));
128 builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
129 builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)));
131 // This method is called only when a given flow object has been
132 // removed from datastore. So FRM always needs to set strict flag
133 // into remove-flow input so that only a flow entry associated with
134 // a given flow object is removed.
135 builder.setTransactionUri(new Uri(provider.getNewTransactionId())).setStrict(Boolean.TRUE);
136 resultFuture = provider.getSalFlowService().removeFlow(builder.build());
143 public void update(final InstanceIdentifier<Flow> identifier, final Flow original, final Flow update,
144 final InstanceIdentifier<FlowCapableNode> nodeIdent) {
146 final TableKey tableKey = identifier.firstKeyOf(Table.class);
147 if (tableIdValidationPrecondition(tableKey, update)) {
148 BundleId bundleId = getActiveBundle(nodeIdent, provider);
149 if (bundleId != null) {
150 provider.getBundleFlowListener().update(identifier, original, update, nodeIdent, bundleId);
152 final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
153 nodeConfigurator.enqueueJob(nodeId, () -> {
154 final UpdateFlowInputBuilder builder = new UpdateFlowInputBuilder();
155 builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
156 builder.setFlowRef(new FlowRef(identifier));
157 builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
159 // This method is called only when a given flow object in datastore
160 // has been updated. So FRM always needs to set strict flag into
161 // update-flow input so that only a flow entry associated with
162 // a given flow object is updated.
163 builder.setUpdatedFlow(new UpdatedFlowBuilder(update).setStrict(Boolean.TRUE).build());
164 builder.setOriginalFlow(new OriginalFlowBuilder(original).setStrict(Boolean.TRUE).build());
166 Uint32 groupId = isFlowDependentOnGroup(update);
167 if (groupId != null) {
168 LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
169 getFlowId(identifier), groupId);
170 if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) {
171 LOG.trace("The dependent group {} is already programmed. Updating the flow {}", groupId,
172 getFlowId(identifier));
173 return provider.getSalFlowService().updateFlow(builder.build());
175 LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
176 ListenableFuture<RpcResult<AddGroupOutput>> groupFuture = pushDependentGroup(nodeIdent,
178 SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture = SettableFuture.create();
179 Futures.addCallback(groupFuture,
180 new UpdateFlowCallBack(builder.build(), nodeId, resultFuture, groupId),
181 MoreExecutors.directExecutor());
186 LOG.trace("The flow {} is not dependent on any group. Updating the flow",
187 getFlowId(identifier));
188 return provider.getSalFlowService().updateFlow(builder.build());
195 public ListenableFuture<? extends RpcResult<?>> add(final InstanceIdentifier<Flow> identifier,
196 final Flow addDataObj, final InstanceIdentifier<FlowCapableNode> nodeIdent) {
197 final var tableKey = identifier.firstKeyOf(Table.class);
198 if (!tableIdValidationPrecondition(tableKey, addDataObj)) {
199 return Futures.immediateFuture(null);
201 final var bundleId = getActiveBundle(nodeIdent, provider);
202 if (bundleId != null) {
203 return provider.getBundleFlowListener().add(identifier, addDataObj, nodeIdent, bundleId);
206 final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
207 return nodeConfigurator.enqueueJob(nodeId, () -> {
208 final var builder = new AddFlowInputBuilder(addDataObj)
209 .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
210 .setFlowRef(new FlowRef(identifier))
211 .setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey)))
212 .setTransactionUri(new Uri(provider.getNewTransactionId()));
213 final var groupId = isFlowDependentOnGroup(addDataObj);
214 if (groupId != null) {
215 LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
216 getFlowId(new FlowRef(identifier)), groupId);
217 if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) {
218 LOG.trace("The dependent group {} is already programmed. Adding the flow {}", groupId,
219 getFlowId(new FlowRef(identifier)));
220 return provider.getSalFlowService().addFlow(builder.build());
223 LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
224 final var groupFuture = pushDependentGroup(nodeIdent, groupId);
225 final var resultFuture = SettableFuture.<RpcResult<AddFlowOutput>>create();
226 Futures.addCallback(groupFuture, new AddFlowCallBack(builder.build(), nodeId, groupId,
227 resultFuture), MoreExecutors.directExecutor());
231 LOG.trace("The flow {} is not dependent on any group. Adding the flow",
232 getFlowId(new FlowRef(identifier)));
233 return provider.getSalFlowService().addFlow(builder.build());
238 public void createStaleMarkEntity(final InstanceIdentifier<Flow> identifier, final Flow del,
239 final InstanceIdentifier<FlowCapableNode> nodeIdent) {
240 LOG.debug("Creating Stale-Mark entry for the switch {} for flow {} ", nodeIdent, del);
241 StaleFlow staleFlow = makeStaleFlow(identifier, del, nodeIdent);
242 persistStaleFlow(staleFlow, nodeIdent);
246 protected InstanceIdentifier<Flow> getWildCardPath() {
247 return InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class)
248 .child(Table.class).child(Flow.class);
251 private static boolean tableIdValidationPrecondition(final TableKey tableKey, final Flow flow) {
252 requireNonNull(tableKey, "TableKey can not be null or empty!");
253 requireNonNull(flow, "Flow can not be null or empty!");
254 if (!tableKey.getId().equals(flow.getTableId())) {
255 LOG.warn("TableID in URI tableId={} and in palyload tableId={} is not same.", flow.getTableId(),
262 private static StaleFlow makeStaleFlow(final InstanceIdentifier<Flow> identifier, final Flow del,
263 final InstanceIdentifier<FlowCapableNode> nodeIdent) {
264 StaleFlowBuilder staleFlowBuilder = new StaleFlowBuilder(del);
265 return staleFlowBuilder.setId(del.getId()).build();
268 private void persistStaleFlow(final StaleFlow staleFlow, final InstanceIdentifier<FlowCapableNode> nodeIdent) {
269 WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
270 writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getStaleFlowInstanceIdentifier(staleFlow, nodeIdent),
273 writeTransaction.commit().addCallback(new FutureCallback<CommitInfo>() {
275 public void onSuccess(final CommitInfo result) {
276 LOG.debug("Stale Flow creation success");
280 public void onFailure(final Throwable throwable) {
281 LOG.error("Stale Flow creation failed", throwable);
283 }, MoreExecutors.directExecutor());
286 private static InstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight
287 .flow.inventory.rev130819.tables.table.StaleFlow> getStaleFlowInstanceIdentifier(
288 final StaleFlow staleFlow, final InstanceIdentifier<FlowCapableNode> nodeIdent) {
289 return nodeIdent.child(Table.class, new TableKey(staleFlow.getTableId())).child(
290 org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow.class,
291 new StaleFlowKey(new FlowId(staleFlow.getId())));
294 private ListenableFuture<RpcResult<AddGroupOutput>> pushDependentGroup(
295 final InstanceIdentifier<FlowCapableNode> nodeIdent, final Uint32 groupId) {
297 //TODO This read to the DS might have a performance impact.
298 //if the dependent group is not installed than we should just cache the parent group,
299 //till we receive the dependent group DTCN and then push it.
301 InstanceIdentifier<Group> groupIdent = buildGroupInstanceIdentifier(nodeIdent, groupId);
302 ListenableFuture<RpcResult<AddGroupOutput>> resultFuture;
303 LOG.info("Reading the group from config inventory: {}", groupId);
304 try (ReadTransaction readTransaction = provider.getReadTransaction()) {
305 Optional<Group> group = readTransaction.read(LogicalDatastoreType.CONFIGURATION, groupIdent).get();
306 if (group.isPresent()) {
307 final AddGroupInputBuilder builder = new AddGroupInputBuilder(group.orElseThrow());
308 builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
309 builder.setGroupRef(new GroupRef(nodeIdent));
310 builder.setTransactionUri(new Uri(provider.getNewTransactionId()));
311 AddGroupInput addGroupInput = builder.build();
312 resultFuture = provider.getSalGroupService().addGroup(addGroupInput);
314 resultFuture = RpcResultBuilder.<AddGroupOutput>failed()
315 .withError(ErrorType.APPLICATION,
316 "Group " + groupId + " not present in the config inventory").buildFuture();
318 } catch (InterruptedException | ExecutionException e) {
319 LOG.error("Error while reading group from config datastore for the group ID {}", groupId, e);
320 resultFuture = RpcResultBuilder.<AddGroupOutput>failed()
321 .withError(ErrorType.APPLICATION,
322 "Error while reading group " + groupId + " from inventory").buildFuture();
327 private final class AddFlowCallBack implements FutureCallback<RpcResult<AddGroupOutput>> {
328 private final AddFlowInput addFlowInput;
329 private final String nodeId;
330 private final Uint32 groupId;
331 private final SettableFuture<RpcResult<AddFlowOutput>> resultFuture;
333 private AddFlowCallBack(final AddFlowInput addFlowInput, final String nodeId, final Uint32 groupId,
334 final SettableFuture<RpcResult<AddFlowOutput>> resultFuture) {
335 this.addFlowInput = addFlowInput;
336 this.nodeId = nodeId;
337 this.groupId = groupId;
338 this.resultFuture = resultFuture;
342 public void onSuccess(final RpcResult<AddGroupOutput> rpcResult) {
343 if (rpcResult.isSuccessful() || rpcResult.getErrors().size() == 1
344 && rpcResult.getErrors().iterator().next().getMessage().contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
345 provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
346 Futures.addCallback(provider.getSalFlowService().addFlow(addFlowInput),
347 new FutureCallback<RpcResult<AddFlowOutput>>() {
349 public void onSuccess(final RpcResult<AddFlowOutput> result) {
350 resultFuture.set(result);
354 public void onFailure(final Throwable failure) {
355 resultFuture.setException(failure);
357 }, MoreExecutors.directExecutor());
359 LOG.debug("Flow add with id {} finished without error for node {}",
360 getFlowId(addFlowInput.getFlowRef()), nodeId);
362 LOG.error("Flow add with id {} failed for node {} with error {}", getFlowId(addFlowInput.getFlowRef()),
363 nodeId, rpcResult.getErrors());
364 resultFuture.set(RpcResultBuilder.<AddFlowOutput>failed()
365 .withRpcErrors(rpcResult.getErrors()).build());
370 public void onFailure(final Throwable throwable) {
371 LOG.error("Service call for adding flow with id {} failed for node {}",
372 getFlowId(addFlowInput.getFlowRef()), nodeId, throwable);
373 resultFuture.setException(throwable);
377 private final class UpdateFlowCallBack implements FutureCallback<RpcResult<AddGroupOutput>> {
378 private final UpdateFlowInput updateFlowInput;
379 private final String nodeId;
380 private final Uint32 groupId;
381 private final SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture;
383 private UpdateFlowCallBack(final UpdateFlowInput updateFlowInput, final String nodeId,
384 final SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture, final Uint32 groupId) {
385 this.updateFlowInput = updateFlowInput;
386 this.nodeId = nodeId;
387 this.groupId = groupId;
388 this.resultFuture = resultFuture;
392 public void onSuccess(final RpcResult<AddGroupOutput> rpcResult) {
393 if (rpcResult.isSuccessful() || rpcResult.getErrors().size() == 1
394 && rpcResult.getErrors().iterator().next().getMessage().contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
395 provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
396 Futures.addCallback(provider.getSalFlowService().updateFlow(updateFlowInput),
397 new FutureCallback<RpcResult<UpdateFlowOutput>>() {
399 public void onSuccess(final RpcResult<UpdateFlowOutput> result) {
400 resultFuture.set(result);
404 public void onFailure(final Throwable failure) {
405 resultFuture.setException(failure);
407 }, MoreExecutors.directExecutor());
409 LOG.debug("Flow update with id {} finished without error for node {}",
410 getFlowId(updateFlowInput.getFlowRef()), nodeId);
412 LOG.error("Flow update with id {} failed for node {} with error {}",
413 getFlowId(updateFlowInput.getFlowRef()), nodeId, rpcResult.getErrors());
414 resultFuture.set(RpcResultBuilder.<UpdateFlowOutput>failed()
415 .withRpcErrors(rpcResult.getErrors()).build());
420 public void onFailure(final Throwable throwable) {
421 LOG.error("Service call for updating flow with id {} failed for node {}",
422 getFlowId(updateFlowInput.getFlowRef()), nodeId, throwable);
423 resultFuture.setException(throwable);