package org.opendaylight.openflowplugin.applications.frm.impl;
-
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
+import java.util.concurrent.Future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.GroupKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroup;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.StaleGroupKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.Messages;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.MessagesBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.Message;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.add.bundle.messages.input.messages.MessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddFlowCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleAddGroupCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveFlowCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.BundleRemoveGroupCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.flow._case.AddFlowCaseDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.add.group._case.AddGroupCaseDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.flow._case.RemoveFlowCaseDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.bundle.inner.message.grouping.bundle.inner.message.bundle.remove.group._case.RemoveGroupCaseDataBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final ForwardingRulesManager provider;
private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+ private final SalBundleService salBundleService;
+
+ private static final AtomicLong BUNDLE_ID = new AtomicLong();
+ private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
+
public FlowNodeReconciliationImpl (final ForwardingRulesManager manager, final DataBroker db) {
this.provider = Preconditions.checkNotNull(manager, "ForwardingRulesManager can not be null!");
dataBroker = Preconditions.checkNotNull(db, "DataBroker can not be null!");
+ salBundleService = Preconditions.checkNotNull(manager.getSalBundleService(),"salBundleService can not be null!");
}
@Override
connectedNode.toString());
reconciliationPreProcess(connectedNode);
}
- ReconciliationTask reconciliationTask = new ReconciliationTask(connectedNode);
- executor.execute(reconciliationTask);
+ LOG.debug("Bundle based reconciliation status : {}", provider.isBundleBasedReconciliationEnabled()?"Enable":"Disable");
+ if (provider.isBundleBasedReconciliationEnabled()) {
+ BundleBasedReconciliationTask bundleBasedReconTask = new BundleBasedReconciliationTask(connectedNode);
+ executor.execute(bundleBasedReconTask);
+ } else {
+ ReconciliationTask reconciliationTask = new ReconciliationTask(connectedNode);
+ executor.execute(reconciliationTask);
+ }
+ }
+ }
+
+ private class BundleBasedReconciliationTask implements Runnable {
+ final InstanceIdentifier<FlowCapableNode> nodeIdentity;
+
+ public BundleBasedReconciliationTask(final InstanceIdentifier<FlowCapableNode> nodeIdent) {
+ nodeIdentity = nodeIdent;
+ }
+
+ @Override
+ public void run() {
+ String sNode = nodeIdentity.firstKeyOf(Node.class, NodeKey.class).getId().getValue();
+ Optional<FlowCapableNode> flowNode = Optional.absent();
+ BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
+ BigInteger nDpId = getDpnIdFromNodeName(sNode);
+ LOG.debug("Triggering bundle based reconciliation for device :{}", nDpId);
+ ReadOnlyTransaction trans = provider.getReadTranaction();
+ try {
+ flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdentity).get();
+ } catch (Exception e) {
+ LOG.error("Error occurred while reading the configuration data store for node {}", nodeIdentity, e);
+ }
+
+ if (flowNode.isPresent()) {
+ LOG.debug("FlowNode present for Datapath ID {}", nDpId);
+ final NodeRef nodeRef = new NodeRef(nodeIdentity.firstIdentifierOf(Node.class));
+
+ final ControlBundleInput openBundleInput = new ControlBundleInputBuilder()
+ .setNode(nodeRef)
+ .setBundleId(bundleIdValue)
+ .setFlags(BUNDLE_FLAGS)
+ .setType(BundleControlType.ONFBCTOPENREQUEST)
+ .build();
+
+ final ControlBundleInput commitBundleInput = new ControlBundleInputBuilder()
+ .setNode(nodeRef)
+ .setBundleId(bundleIdValue)
+ .setFlags(BUNDLE_FLAGS)
+ .setType(BundleControlType.ONFBCTCOMMITREQUEST)
+ .build();
+
+ final AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
+ .setNode(nodeRef)
+ .setBundleId(bundleIdValue)
+ .setFlags(BUNDLE_FLAGS)
+ .setMessages(createMessages(nodeRef, flowNode))
+ .build();
+
+ Future<RpcResult<Void>> openBundle = salBundleService.controlBundle(openBundleInput);
+
+ ListenableFuture<RpcResult<Void>> addBundleMessagesFuture =
+ Futures.transformAsync(JdkFutureAdapters.listenInPoolThread(openBundle), rpcResult -> {
+ if (rpcResult.isSuccessful()) {
+ return JdkFutureAdapters.listenInPoolThread(
+ salBundleService.addBundleMessages(addBundleMessagesInput));
+ }
+ return Futures.immediateFuture(null);
+ });
+
+ ListenableFuture<RpcResult<Void>> commitBundleFuture =
+ Futures.transformAsync(addBundleMessagesFuture, rpcResult -> {
+ if (rpcResult.isSuccessful()) {
+ return JdkFutureAdapters.listenInPoolThread(
+ salBundleService.controlBundle(commitBundleInput));
+ }
+ return Futures.immediateFuture(null);
+ });
+
+ /* Bundles not supported for meters*/
+ List<Meter> meters = flowNode.get().getMeter() != null
+ ? flowNode.get().getMeter() : Collections.emptyList();
+ ListenableFuture<RpcResult<Void>> meterFuture =
+ Futures.transformAsync(commitBundleFuture, rpcResult -> {
+ if (rpcResult.isSuccessful()) {
+ for (Meter meter : meters) {
+ final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
+ nodeIdentity.child(Meter.class, meter.getKey());
+ provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
+ }
+ }
+ return Futures.immediateFuture(null);
+ });
+
+ LOG.debug("Completing bundle based reconciliation for device ID:{}", nDpId);
+ trans.close();
+ }
}
}
@Override
public void run() {
-
String sNode = nodeIdentity.firstKeyOf(Node.class, NodeKey.class).getId().getValue();
BigInteger nDpId = getDpnIdFromNodeName(sNode);
}
});
}
+
+ private Flow getDeleteAllFlow(){
+ final FlowBuilder flowBuilder = new FlowBuilder();
+ flowBuilder.setTableId(OFConstants.OFPTT_ALL);
+ return flowBuilder.build();
+ }
+
+ private Group getDeleteAllGroup(){
+ final GroupBuilder groupBuilder = new GroupBuilder();
+ groupBuilder.setGroupType(GroupTypes.GroupAll);
+ groupBuilder.setGroupId(new GroupId(OFConstants.OFPG_ALL));
+ return groupBuilder.build();
+ }
+
+ private Messages createMessages(final NodeRef nodeRef , final Optional<FlowCapableNode> flowNode) {
+ final List<Message> messages = new ArrayList<>();
+ messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
+ new BundleRemoveFlowCaseBuilder()
+ .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(getDeleteAllFlow()).build()).build()).build());
+
+ messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
+ new BundleRemoveGroupCaseBuilder()
+ .setRemoveGroupCaseData(new RemoveGroupCaseDataBuilder(getDeleteAllGroup()).build()).build()).build());
+
+ if(flowNode.get().getGroup()!= null) {
+ for (Group gr : flowNode.get().getGroup()) {
+ messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
+ new BundleAddGroupCaseBuilder()
+ .setAddGroupCaseData(new AddGroupCaseDataBuilder(gr).build()).build()).build());
+ }
+ }
+
+ if(flowNode.get().getTable()!= null) {
+ for (Table table : flowNode.get().getTable()) {
+ for (Flow flow : table.getFlow()) {
+ messages.add(new MessageBuilder().setNode(nodeRef).setBundleInnerMessage(
+ new BundleAddFlowCaseBuilder()
+ .setAddFlowCaseData(new AddFlowCaseDataBuilder(flow).build()).build()).build());
+ }
+ }
+ }
+
+ LOG.debug("The size of the flows and group messages created in createMessage() {}", messages.size());
+ return new MessagesBuilder().setMessage(messages).build();
+ }
}
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.forwardingrules.manager.config.rev160511.ForwardingRulesManagerConfig;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.SalTableService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
private final SalTableService salTableService;
private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
private final NotificationProviderService notificationService;
+ private final SalBundleService salBundleService;
private final AutoCloseable configurationServiceRegistration;
private ForwardingRulesCommiter<Flow> flowListener;
private ForwardingRulesCommiter<Group> groupListener;
private boolean disableReconciliation;
private boolean staleMarkingEnabled;
private int reconciliationRetryCount;
+ private boolean isBundleBasedReconciliationEnabled;
public ForwardingRulesManagerImpl(final DataBroker dataBroker,
final RpcConsumerRegistry rpcRegistry,
disableReconciliation = config.isDisableReconciliation();
staleMarkingEnabled = config.isStaleMarkingEnabled();
reconciliationRetryCount = config.getReconciliationRetryCount();
-
+ isBundleBasedReconciliationEnabled = config.isBundleBasedReconciliationEnabled();
this.configurationServiceRegistration = configurationService.registerListener(this);
this.dataService = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonService,
"RPC SalMeterService not found.");
this.salTableService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalTableService.class),
"RPC SalTableService not found.");
+ this.salBundleService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalBundleService.class),
+ "RPC SalBundlService not found.");
}
@Override
return salTableService;
}
+ @Override
+ public SalBundleService getSalBundleService() {
+ return salBundleService;
+ }
+
@Override
public ForwardingRulesCommiter<Flow> getFlowCommiter() {
return flowListener;
return flowNodeConnectorInventoryTranslatorImpl;
}
+ @Override
+ public boolean isBundleBasedReconciliationEnabled() {
+ return isBundleBasedReconciliationEnabled;
+ }
+
@Override
public boolean isNodeOwner(InstanceIdentifier<FlowCapableNode> ident) {
return Objects.nonNull(ident) && deviceMastershipManager.isDeviceMastered(ident.firstKeyOf(Node.class).getId());
case RECONCILIATION_RETRY_COUNT:
reconciliationRetryCount = Integer.valueOf(propertyValue);
break;
+ case BUNDLE_BASED_RECONCILIATION_ENABLED:
+ isBundleBasedReconciliationEnabled = Boolean.valueOf(propertyValue);
+ break;
}
});
}