/** * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.openflowplugin.applications.frm.impl; import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.buildGroupInstanceIdentifier; import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getActiveBundle; import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getFlowId; import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdFromNodeIdentifier; import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isFlowDependentOnGroup; import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isGroupExistsOnDevice; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.infrautils.utils.concurrent.JdkFutures; import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlowBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlowBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * FlowForwarder It implements * {@link org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener} * for WildCardedPath to {@link Flow} and ForwardingRulesCommiter interface for * methods: add, update and remove {@link Flow} processing for * {@link org.opendaylight.controller.md.sal.binding.api.DataTreeModification}. */ public class FlowForwarder extends AbstractListeningCommiter { private static final Logger LOG = LoggerFactory.getLogger(FlowForwarder.class); private static final String GROUP_EXISTS_IN_DEVICE_ERROR = "GROUPEXISTS"; private ListenerRegistration listenerRegistration; private final BundleFlowForwarder bundleFlowForwarder; public FlowForwarder(final ForwardingRulesManager manager, final DataBroker db) { super(manager, db); bundleFlowForwarder = new BundleFlowForwarder(manager); } @SuppressWarnings("IllegalCatch") @Override public void registerListener() { final DataTreeIdentifier treeId = new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, getWildCardPath()); try { listenerRegistration = dataBroker.registerDataTreeChangeListener(treeId, FlowForwarder.this); } catch (final Exception e) { LOG.warn("FRM Flow DataTreeChange listener registration fail!"); LOG.debug("FRM Flow DataTreeChange listener registration fail ..", e); throw new IllegalStateException("FlowForwarder startup fail! System needs restart.", e); } } @Override public void deregisterListener() { close(); } @Override public void close() { if (listenerRegistration != null) { listenerRegistration.close(); listenerRegistration = null; } } @Override public void remove(final InstanceIdentifier identifier, final Flow removeDataObj, final InstanceIdentifier nodeIdent) { final TableKey tableKey = identifier.firstKeyOf(Table.class, TableKey.class); if (tableIdValidationPrecondition(tableKey, removeDataObj)) { BundleId bundleId = getActiveBundle(nodeIdent, provider); if (bundleId != null) { bundleFlowForwarder.remove(identifier, removeDataObj, nodeIdent, bundleId); } else { final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj); builder.setFlowRef(new FlowRef(identifier)); builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey))); // This method is called only when a given flow object has been // removed from datastore. So FRM always needs to set strict flag // into remove-flow input so that only a flow entry associated with // a given flow object is removed. builder.setTransactionUri(new Uri(provider.getNewTransactionId())).setStrict(Boolean.TRUE); final Future> resultFuture = provider.getSalFlowService().removeFlow(builder.build()); JdkFutures.addErrorLogging(resultFuture, LOG, "removeFlow"); } } } // TODO: Pull this into ForwardingRulesCommiter and override it here @Override public Future> removeWithResult(final InstanceIdentifier identifier, final Flow removeDataObj, final InstanceIdentifier nodeIdent) { Future> resultFuture = SettableFuture.create(); final TableKey tableKey = identifier.firstKeyOf(Table.class, TableKey.class); if (tableIdValidationPrecondition(tableKey, removeDataObj)) { final RemoveFlowInputBuilder builder = new RemoveFlowInputBuilder(removeDataObj); builder.setFlowRef(new FlowRef(identifier)); builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey))); // This method is called only when a given flow object has been // removed from datastore. So FRM always needs to set strict flag // into remove-flow input so that only a flow entry associated with // a given flow object is removed. builder.setTransactionUri(new Uri(provider.getNewTransactionId())).setStrict(Boolean.TRUE); resultFuture = provider.getSalFlowService().removeFlow(builder.build()); } return resultFuture; } @Override public void update(final InstanceIdentifier identifier, final Flow original, final Flow update, final InstanceIdentifier nodeIdent) { final TableKey tableKey = identifier.firstKeyOf(Table.class, TableKey.class); if (tableIdValidationPrecondition(tableKey, update)) { BundleId bundleId = getActiveBundle(nodeIdent, provider); if (bundleId != null) { bundleFlowForwarder.update(identifier, original, update, nodeIdent, bundleId); } else { final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent); nodeConfigurator.enqueueJob(nodeId.getValue(), () -> { final UpdateFlowInputBuilder builder = new UpdateFlowInputBuilder(); builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); builder.setFlowRef(new FlowRef(identifier)); builder.setTransactionUri(new Uri(provider.getNewTransactionId())); // This method is called only when a given flow object in datastore // has been updated. So FRM always needs to set strict flag into // update-flow input so that only a flow entry associated with // a given flow object is updated. builder.setUpdatedFlow(new UpdatedFlowBuilder(update).setStrict(Boolean.TRUE).build()); builder.setOriginalFlow(new OriginalFlowBuilder(original).setStrict(Boolean.TRUE).build()); Long groupId = isFlowDependentOnGroup(update); ListenableFuture> future = Futures.immediateFuture(null); if (groupId != null) { LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present", getFlowId(new FlowRef(identifier)), groupId); if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) { LOG.trace("The dependent group {} is already programmed. Updating the flow {}", groupId, getFlowId(new FlowRef(identifier))); future = provider.getSalFlowService().updateFlow(builder.build()); JdkFutures.addErrorLogging(future, LOG, "updateFlow"); } else { LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId); ListenableFuture> groupFuture = pushDependentGroup(nodeIdent, groupId); Futures.addCallback(groupFuture, new UpdateFlowCallBack(builder.build(), nodeId, future, groupId), MoreExecutors.directExecutor()); } } else { LOG.trace("The flow {} is not dependent on any group. Updating the flow", getFlowId(new FlowRef(identifier))); future = provider.getSalFlowService().updateFlow(builder.build()); JdkFutures.addErrorLogging(future, LOG, "updateFlow"); } return future; }); } } } @Override public Future> add(final InstanceIdentifier identifier, final Flow addDataObj, final InstanceIdentifier nodeIdent) { final TableKey tableKey = identifier.firstKeyOf(Table.class, TableKey.class); if (tableIdValidationPrecondition(tableKey, addDataObj)) { BundleId bundleId = getActiveBundle(nodeIdent, provider); if (bundleId != null) { return bundleFlowForwarder.add(identifier, addDataObj, nodeIdent, bundleId); } else { final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent); nodeConfigurator.enqueueJob(nodeId.getValue(), () -> { final AddFlowInputBuilder builder = new AddFlowInputBuilder(addDataObj); builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); builder.setFlowRef(new FlowRef(identifier)); builder.setFlowTable(new FlowTableRef(nodeIdent.child(Table.class, tableKey))); builder.setTransactionUri(new Uri(provider.getNewTransactionId())); Long groupId = isFlowDependentOnGroup(addDataObj); ListenableFuture> future = SettableFuture.create(); if (groupId != null) { LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present", getFlowId(new FlowRef(identifier)), groupId); if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) { LOG.trace("The dependent group {} is already programmed. Adding the flow {}", groupId, getFlowId(new FlowRef(identifier))); future = provider.getSalFlowService().addFlow(builder.build()); } else { LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId); ListenableFuture> groupFuture = pushDependentGroup(nodeIdent, groupId); Futures.addCallback(groupFuture, new AddFlowCallBack(builder.build(), nodeId, future, groupId), MoreExecutors.directExecutor()); } } else { LOG.trace("The flow {} is not dependent on any group. Adding the flow", getFlowId(new FlowRef(identifier))); future = provider.getSalFlowService().addFlow(builder.build()); } return future; }); } } return Futures.immediateFuture(null); } @Override public void createStaleMarkEntity(InstanceIdentifier identifier, Flow del, InstanceIdentifier nodeIdent) { LOG.debug("Creating Stale-Mark entry for the switch {} for flow {} ", nodeIdent.toString(), del.toString()); StaleFlow staleFlow = makeStaleFlow(identifier, del, nodeIdent); persistStaleFlow(staleFlow, nodeIdent); } @Override protected InstanceIdentifier getWildCardPath() { return InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class) .child(Table.class).child(Flow.class); } private static boolean tableIdValidationPrecondition(final TableKey tableKey, final Flow flow) { Preconditions.checkNotNull(tableKey, "TableKey can not be null or empty!"); Preconditions.checkNotNull(flow, "Flow can not be null or empty!"); if (!tableKey.getId().equals(flow.getTableId())) { LOG.warn("TableID in URI tableId={} and in palyload tableId={} is not same.", flow.getTableId(), tableKey.getId()); return false; } return true; } private StaleFlow makeStaleFlow(InstanceIdentifier identifier, Flow del, InstanceIdentifier nodeIdent) { StaleFlowBuilder staleFlowBuilder = new StaleFlowBuilder(del); return staleFlowBuilder.setId(del.getId()).build(); } private void persistStaleFlow(StaleFlow staleFlow, InstanceIdentifier nodeIdent) { WriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction(); writeTransaction.put(LogicalDatastoreType.CONFIGURATION, getStaleFlowInstanceIdentifier(staleFlow, nodeIdent), staleFlow, false); ListenableFuture submitFuture = writeTransaction.submit(); handleStaleFlowResultFuture(submitFuture); } private void handleStaleFlowResultFuture(ListenableFuture submitFuture) { Futures.addCallback(submitFuture, new FutureCallback() { @Override public void onSuccess(Void result) { LOG.debug("Stale Flow creation success"); } @Override public void onFailure(Throwable throwable) { LOG.error("Stale Flow creation failed {}", throwable); } }, MoreExecutors.directExecutor()); } private InstanceIdentifier getStaleFlowInstanceIdentifier( StaleFlow staleFlow, InstanceIdentifier nodeIdent) { return nodeIdent.child(Table.class, new TableKey(staleFlow.getTableId())).child( org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow.class, new StaleFlowKey(new FlowId(staleFlow.getId()))); } private ListenableFuture> pushDependentGroup( final InstanceIdentifier nodeIdent, final Long groupId) { //TODO This read to the DS might have a performance impact. //if the dependent group is not installed than we should just cache the parent group, //till we receive the dependent group DTCN and then push it. InstanceIdentifier groupIdent = buildGroupInstanceIdentifier(nodeIdent, groupId); ListenableFuture> resultFuture; LOG.info("Reading the group from config inventory: {}", groupId); try { Optional group; group = provider.getReadTranaction().read(LogicalDatastoreType.CONFIGURATION, groupIdent).get(); if (group.isPresent()) { final AddGroupInputBuilder builder = new AddGroupInputBuilder(group.get()); builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))); builder.setGroupRef(new GroupRef(nodeIdent)); builder.setTransactionUri(new Uri(provider.getNewTransactionId())); AddGroupInput addGroupInput = builder.build(); resultFuture = this.provider.getSalGroupService().addGroup(addGroupInput); } else { resultFuture = Futures.immediateFuture(RpcResultBuilder.failed() .withError(RpcError.ErrorType.APPLICATION, "Group " + groupId + " not present in the config inventory").build()); } } catch (InterruptedException | ExecutionException e) { LOG.error("Error while reading group from config datastore for the group ID {}", groupId, e); resultFuture = Futures.immediateFuture(RpcResultBuilder.failed() .withError(RpcError.ErrorType.APPLICATION, "Error while reading group " + groupId + " from inventory").build()); } return resultFuture; } private final class AddFlowCallBack implements FutureCallback> { private final AddFlowInput addFlowInput; private final NodeId nodeId; private ListenableFuture> future; private final Long groupId; private AddFlowCallBack(final AddFlowInput addFlowInput, final NodeId nodeId, ListenableFuture> future, Long groupId) { this.addFlowInput = addFlowInput; this.nodeId = nodeId; this.future = future; this.groupId = groupId; } @Override public void onSuccess(RpcResult rpcResult) { if (rpcResult.isSuccessful()) { provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId); future = provider.getSalFlowService().addFlow(addFlowInput); LOG.debug("Flow add with id {} finished without error for node {}", getFlowId(addFlowInput.getFlowRef()), nodeId); } else { if (rpcResult.getErrors().size() == 1 && rpcResult.getErrors().iterator().next().getMessage() .contains(GROUP_EXISTS_IN_DEVICE_ERROR)) { provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId); future = provider.getSalFlowService().addFlow(addFlowInput); LOG.debug("Group {} already programmed in the device. Adding the flow {}", groupId, getFlowId(addFlowInput.getFlowRef())); } else { LOG.error("Flow add with id {} failed for node {} with error {}", getFlowId(addFlowInput.getFlowRef()), nodeId, rpcResult.getErrors().toString()); future = Futures.immediateFuture(null); } } } @Override public void onFailure(Throwable throwable) { LOG.error("Service call for adding flow with id {} failed for node {} with error {}", getFlowId(addFlowInput.getFlowRef()), nodeId, throwable.getCause()); Futures.immediateFailedFuture(null); } } private final class UpdateFlowCallBack implements FutureCallback> { private final UpdateFlowInput updateFlowInput; private final NodeId nodeId; private ListenableFuture> future; private final Long groupId; private UpdateFlowCallBack(final UpdateFlowInput updateFlowInput, final NodeId nodeId, ListenableFuture> future, Long groupId) { this.updateFlowInput = updateFlowInput; this.nodeId = nodeId; this.future = future; this.groupId = groupId; } @Override public void onSuccess(RpcResult rpcResult) { if (rpcResult.isSuccessful()) { provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId); future = provider.getSalFlowService().updateFlow(updateFlowInput); LOG.debug("Flow update with id {} finished without error for node {}", getFlowId(updateFlowInput.getFlowRef()), nodeId); } else { if (rpcResult.getErrors().size() == 1 && rpcResult.getErrors().iterator().next().getMessage() .contains(GROUP_EXISTS_IN_DEVICE_ERROR)) { provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId); future = provider.getSalFlowService().updateFlow(updateFlowInput); LOG.debug("Group {} already programmed in the device. Updating the flow {}", groupId, getFlowId(updateFlowInput.getFlowRef())); } else { LOG.error("Flow update with id {} failed for node {} with error {}", getFlowId(updateFlowInput.getFlowRef()), nodeId, rpcResult.getErrors().toString()); future = Futures.immediateFuture(null); } } } @Override public void onFailure(Throwable throwable) { LOG.error("Service call for updating flow with id {} failed for node {} with error {}", getFlowId(updateFlowInput.getFlowRef()), nodeId, throwable); future = Futures.immediateFailedFuture(null); } } }