import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
-import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
private final DataBroker dataBroker;
private final ForwardingRulesManager provider;
+ private final String serviceName;
+ final private int priority;
+ final private ResultState resultState;
+ private Map<DeviceInfo, ListenableFuture<Boolean>> futureMap = new HashMap<>();
+
private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
private final SalBundleService salBundleService;
private static final AtomicLong BUNDLE_ID = new AtomicLong();
private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
- public FlowNodeReconciliationImpl (final ForwardingRulesManager manager, final DataBroker db) {
+ public FlowNodeReconciliationImpl (final ForwardingRulesManager manager, final DataBroker db,
+ final String serviceName, final int priority, final ResultState resultState) {
this.provider = Preconditions.checkNotNull(manager, "ForwardingRulesManager can not be null!");
dataBroker = Preconditions.checkNotNull(db, "DataBroker can not be null!");
+ this.serviceName = serviceName;
+ this.priority = priority;
+ this.resultState = resultState;
salBundleService = Preconditions.checkNotNull(manager.getSalBundleService(),"salBundleService can not be null!");
}
}
}
- @Override
- public void reconcileConfiguration(InstanceIdentifier<FlowCapableNode> connectedNode) {
- if (provider.isReconciliationDisabled()) {
- LOG.debug("Reconciliation is disabled by user. Skipping reconciliation of node : {}", connectedNode
- .firstKeyOf(Node.class));
- return;
+ private ListenableFuture<Boolean> reconcileConfiguration(InstanceIdentifier<FlowCapableNode> connectedNode) {
+ LOG.info("Triggering reconciliation for device {}", connectedNode.firstKeyOf(Node.class));
+ if (provider.isStaleMarkingEnabled()) {
+ LOG.info("Stale-Marking is ENABLED and proceeding with deletion of "
+ + "stale-marked entities on switch {}",
+ connectedNode.toString());
+ reconciliationPreProcess(connectedNode);
}
- if (provider.isNodeOwner(connectedNode)) {
- LOG.info("Triggering reconciliation for device {}", connectedNode.firstKeyOf(Node.class));
- if (provider.isStaleMarkingEnabled()) {
- LOG.info("Stale-Marking is ENABLED and proceeding with deletion of stale-marked entities on switch {}",
- connectedNode.toString());
- reconciliationPreProcess(connectedNode);
- }
- LOG.debug("Bundle based reconciliation status : {}", provider.isBundleBasedReconciliationEnabled()?"Enable":"Disable");
- if (provider.isBundleBasedReconciliationEnabled()) {
- BundleBasedReconciliationTask bundleBasedReconTask = new BundleBasedReconciliationTask(connectedNode);
- executor.execute(bundleBasedReconTask);
- } else {
- ReconciliationTask reconciliationTask = new ReconciliationTask(connectedNode);
- executor.execute(reconciliationTask);
- }
+ LOG.debug("Bundle based reconciliation status : {}", provider.isBundleBasedReconciliationEnabled()?"Enable":"Disable");
+ if (provider.isBundleBasedReconciliationEnabled()) {
+ BundleBasedReconciliationTask bundleBasedReconTask = new BundleBasedReconciliationTask(connectedNode);
+ return JdkFutureAdapters.listenInPoolThread(executor.submit(bundleBasedReconTask));
+ } else {
+ ReconciliationTask reconciliationTask = new ReconciliationTask(connectedNode);
+ return JdkFutureAdapters.listenInPoolThread(executor.submit(reconciliationTask));
}
}
- private class BundleBasedReconciliationTask implements Runnable {
+ private class BundleBasedReconciliationTask implements Callable<Boolean> {
final InstanceIdentifier<FlowCapableNode> nodeIdentity;
public BundleBasedReconciliationTask(final InstanceIdentifier<FlowCapableNode> nodeIdent) {
}
@Override
- public void run() {
+ public Boolean call() {
String sNode = nodeIdentity.firstKeyOf(Node.class, NodeKey.class).getId().getValue();
Optional<FlowCapableNode> flowNode = Optional.absent();
BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
/* Bundles not supported for meters*/
List<Meter> meters = flowNode.get().getMeter() != null
? flowNode.get().getMeter() : Collections.emptyList();
- ListenableFuture<RpcResult<Void>> meterFuture =
- Futures.transformAsync(commitBundleFuture, rpcResult -> {
- if (rpcResult.isSuccessful()) {
- for (Meter meter : meters) {
- final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
- nodeIdentity.child(Meter.class, meter.getKey());
- provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
- }
+ ListenableFuture<RpcResult<Void>> meterFuture =
+ Futures.transformAsync(commitBundleFuture, rpcResult -> {
+ if (rpcResult.isSuccessful()) {
+ for (Meter meter : meters) {
+ final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
+ nodeIdentity.child(Meter.class, meter.getKey());
+ provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
+ }
+ }
+ return Futures.immediateFuture(null);
+ });
+
+ trans.close();
+ try {
+ if(commitBundleFuture.get().isSuccessful()) {
+ LOG.debug("Completing bundle based reconciliation for device ID:{}", nDpId);
+ return true;
+ } else {
+ return false;
}
- return Futures.immediateFuture(null);
- });
-
- LOG.debug("Completing bundle based reconciliation for device ID:{}", nDpId);
- trans.close();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error while doing bundle based reconciliation for device ID:{}", nodeIdentity);
+ return false;
+ }
}
+ LOG.error("FlowNode not present for Datapath ID {}", nDpId);
+ return false;
}
}
- private class ReconciliationTask implements Runnable {
+ @Override
+ public ListenableFuture<Boolean> startReconciliation(DeviceInfo node) {
+ InstanceIdentifier<FlowCapableNode> connectedNode = node.getNodeInstanceIdentifier()
+ .augmentation(FlowCapableNode.class);
+ return futureMap.computeIfAbsent(node, future -> reconcileConfiguration(connectedNode));
+ }
+
+ @Override
+ public ListenableFuture<Boolean> endReconciliation(DeviceInfo node) {
+ futureMap.computeIfPresent(node, (key, future) -> future).cancel(true);
+ futureMap.remove(node);
+ return Futures.immediateFuture(true);
+ }
+
+ @Override
+ public int getPriority() {
+ return priority;
+ }
+
+ @Override
+ public String getName() {
+ return serviceName;
+ }
+
+ @Override
+ public ResultState getResultState() {
+ return resultState;
+ }
+
+ private class ReconciliationTask implements Callable<Boolean> {
InstanceIdentifier<FlowCapableNode> nodeIdentity;
nodeIdentity = nodeIdent;
}
- @Override
- public void run() {
+ public Boolean call() {
String sNode = nodeIdentity.firstKeyOf(Node.class, NodeKey.class).getId().getValue();
BigInteger nDpId = getDpnIdFromNodeName(sNode);
ReadOnlyTransaction trans = provider.getReadTranaction();
Optional<FlowCapableNode> flowNode = Optional.absent();
-
//initialize the counter
int counter = 0;
try {
flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdentity).get();
} catch (Exception e) {
LOG.warn("Fail with read Config/DS for Node {} !", nodeIdentity, e);
+ return false;
}
if (flowNode.isPresent()) {
}
/* clean transaction */
trans.close();
+ return true;
}
/**