*/
package org.opendaylight.netvirt.elan.utils;
-import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.function.Function;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.genius.datastoreutils.DataStoreJobCoordinator;
-import org.opendaylight.genius.utils.SystemPropertyReader;
-import org.opendaylight.genius.utils.clustering.ClusteringUtils;
+import org.opendaylight.genius.utils.clustering.EntityOwnershipUtils;
import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundConstants;
-import org.opendaylight.netvirt.elan.internal.ElanServiceProvider;
+import org.opendaylight.netvirt.elan.l2gw.utils.SettableFutureCallback;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.concurrent.Callable;
-
-public class ElanClusterUtils {
- private static final Logger logger = LoggerFactory.getLogger(ElanClusterUtils.class);
+public final class ElanClusterUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(ElanClusterUtils.class);
- private static ElanServiceProvider elanServiceProvider = null;
- public static void setElanServiceProvider(ElanServiceProvider elanServiceProvider) {
- ElanClusterUtils.elanServiceProvider = elanServiceProvider;
+ private ElanClusterUtils() {
}
- public static void runOnlyInLeaderNode(Runnable job) {
- runOnlyInLeaderNode(job, "");
+ public static void runOnlyInOwnerNode(EntityOwnershipUtils entityOwnershipUtils, String jobDesc, Runnable job) {
+ entityOwnershipUtils.runOnlyInOwnerNode(HwvtepSouthboundConstants.ELAN_ENTITY_TYPE,
+ HwvtepSouthboundConstants.ELAN_ENTITY_NAME, DataStoreJobCoordinator.getInstance(), jobDesc, job);
}
- public static void runOnlyInLeaderNode(final Runnable job, final String jobDescription) {
- ListenableFuture<Boolean> checkEntityOwnerFuture = ClusteringUtils.checkNodeEntityOwner(
- elanServiceProvider.getEntityOwnershipService(), HwvtepSouthboundConstants.ELAN_ENTITY_TYPE,
- HwvtepSouthboundConstants.ELAN_ENTITY_NAME);
- Futures.addCallback(checkEntityOwnerFuture, new FutureCallback<Boolean>() {
- @Override
- public void onSuccess(Boolean isOwner) {
- if (isOwner) {
- job.run();
- } else {
- logger.trace("job is not run as i m not cluster owner desc :{} ", jobDescription);
- }
- }
- @Override
- public void onFailure(Throwable error) {
- logger.error("Failed to identity cluster owner ", error);
- }
- });
+ public static void runOnlyInOwnerNode(EntityOwnershipUtils entityOwnershipUtils, String jobKey, String jobDesc,
+ Callable<List<ListenableFuture<Void>>> job) {
+ entityOwnershipUtils.runOnlyInOwnerNode(HwvtepSouthboundConstants.ELAN_ENTITY_TYPE,
+ HwvtepSouthboundConstants.ELAN_ENTITY_NAME, DataStoreJobCoordinator.getInstance(), jobKey, jobDesc, job);
}
- public static void runOnlyInLeaderNode(String jobKey, Callable<List<ListenableFuture<Void>>> dataStoreJob) {
- runOnlyInLeaderNode(jobKey, "", dataStoreJob);
- }
+ public static <T extends DataObject> void asyncReadAndExecute(final DataBroker broker,
+ final LogicalDatastoreType datastoreType,
+ final InstanceIdentifier<T> iid,
+ final String jobKey,
+ final Function<Optional<T>, Void> function) {
+ DataStoreJobCoordinator.getInstance().enqueueJob(jobKey, () -> {
+ SettableFuture settableFuture = SettableFuture.create();
+ List<ListenableFuture<Void>> futures = Collections.singletonList(settableFuture);
- public static void runOnlyInLeaderNode(final String jobKey, final String jobDescription,
- final Callable<List<ListenableFuture<Void>>> dataStoreJob) {
- ListenableFuture<Boolean> checkEntityOwnerFuture = ClusteringUtils.checkNodeEntityOwner(
- elanServiceProvider.getEntityOwnershipService(), HwvtepSouthboundConstants.ELAN_ENTITY_TYPE,
- HwvtepSouthboundConstants.ELAN_ENTITY_NAME);
- Futures.addCallback(checkEntityOwnerFuture, new FutureCallback<Boolean>() {
- @Override
- public void onSuccess(Boolean isOwner) {
- if (isOwner) {
- logger.trace("scheduling job {} ", jobDescription);
- elanServiceProvider.getDataStoreJobCoordinator().enqueueJob(jobKey, dataStoreJob,
- SystemPropertyReader.getDataStoreJobCoordinatorMaxRetries());
- } else {
- logger.trace("job is not run as i m not cluster owner desc :{} ", jobDescription);
- }
- }
- @Override
- public void onFailure(Throwable error) {
- logger.error("Failed to identity cluster owner for job "+jobDescription, error);
- }
- });
- }
+ ReadWriteTransaction tx = broker.newReadWriteTransaction();
+ Futures.addCallback(tx.read(datastoreType, iid),
+ new SettableFutureCallback<Optional<T>>(settableFuture) {
+ @Override
+ public void onSuccess(Optional<T> data) {
+ function.apply(data);
+ super.onSuccess(data);
+ }
+ }, MoreExecutors.directExecutor());
+
+ return futures;
+ }, ElanConstants.JOB_MAX_RETRIES);
+ }
}