/* * Copyright (c) 2020 Ericsson India Global Services Pvt Ltd. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.netvirt.elan.l2gw.ha; import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.util.Collection; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import org.opendaylight.genius.utils.batching.ResourceBatchingManager; import org.opendaylight.mdsal.binding.util.Datastore; import org.opendaylight.mdsal.binding.util.Datastore.Configuration; import org.opendaylight.mdsal.binding.util.TypedReadWriteTransaction; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAJobScheduler; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class BatchedTransaction implements TypedReadWriteTransaction { private static final Logger LOG = LoggerFactory.getLogger(BatchedTransaction.class); private static Map> configInProgress = new ConcurrentHashMap<>(); private static Map> opInProgress = new ConcurrentHashMap<>(); private Map> currentOps = new ConcurrentHashMap<>(); private SettableFuture result = SettableFuture.create(); private boolean updateMetric; private NodeId srcNodeId; private Class type; public BatchedTransaction(Class logicalDatastoreType) { this.type = logicalDatastoreType; } public ListenableFuture getResult() { return result; } @Override public FluentFuture> read(InstanceIdentifier instanceIdentifier) { try { return ResourceBatchingManager.getInstance() .read(getShard().name(), instanceIdentifier); } catch (ExecutionException | InterruptedException e) { LOG.error("BatchTxn failed to Read {}", instanceIdentifier); } return FluentFutures.immediateFailedFluentFuture(new Throwable()); } @Override public FluentFuture exists(InstanceIdentifier path) { // NOT SUPPORTED return FluentFutures.immediateFailedFluentFuture(new Throwable()); } ResourceBatchingManager.ShardResource getShard() { if (Configuration.class.equals(type)) { return ResourceBatchingManager.ShardResource.CONFIG_TOPOLOGY; } return ResourceBatchingManager.ShardResource.OPERATIONAL_TOPOLOGY; } public static synchronized void markUpdateInProgress(Class type, InstanceIdentifier instanceIdentifier, ListenableFuture ft) { markUpdateInProgress(type, instanceIdentifier, ft, ""); } public static synchronized void markUpdateInProgress(Class type, InstanceIdentifier instanceIdentifier,ListenableFuture ft, String desc) { if (Configuration.class.equals(type)) { // NodeKey nodeKey = (NodeKey) instanceIdentifier.firstKeyOf(Node.class); configInProgress.put(instanceIdentifier, ft); Futures.addCallback(ft, new FutureCallback() { @Override public void onSuccess(Void result) { configInProgress.remove(instanceIdentifier); } @Override public void onFailure(Throwable throwable) { configInProgress.remove(instanceIdentifier); LOG.error("Failed to update mdsal op {}", instanceIdentifier, throwable); } }, MoreExecutors.directExecutor()); } else { opInProgress.put(instanceIdentifier, ft); Futures.addCallback(ft, new FutureCallback() { @Override public void onSuccess(Void result) { opInProgress.remove(instanceIdentifier); } @Override public void onFailure(Throwable throwable) { opInProgress.remove(instanceIdentifier); } }, MoreExecutors.directExecutor()); } } public static synchronized boolean isInProgress(LogicalDatastoreType logicalDatastoreType, InstanceIdentifier instanceIdentifier) { ListenableFuture ft = getInprogressFt(logicalDatastoreType, instanceIdentifier); return ft != null && !ft.isDone() && !ft.isCancelled(); } public static synchronized boolean addCallbackIfInProgress(LogicalDatastoreType logicalDatastoreType, InstanceIdentifier instanceIdentifier, Runnable runnable) { ListenableFuture ft = getInprogressFt(logicalDatastoreType, instanceIdentifier); if (ft != null && !ft.isDone() && !ft.isCancelled()) { Futures.addCallback(ft, new FutureCallback() { @Override public void onSuccess(Void result) { HAJobScheduler.getInstance().submitJob(runnable); } @Override public void onFailure(Throwable throwable) { HAJobScheduler.getInstance().submitJob(runnable); } }, MoreExecutors.directExecutor()); return true; } if (logicalDatastoreType == LogicalDatastoreType.CONFIGURATION) { configInProgress.remove(instanceIdentifier); } else { opInProgress.remove(instanceIdentifier); } return false; } static ListenableFuture getInprogressFt(LogicalDatastoreType logicalDatastoreType, InstanceIdentifier instanceIdentifier) { if (logicalDatastoreType == LogicalDatastoreType.CONFIGURATION) { return configInProgress.get(instanceIdentifier); } else { return opInProgress.get(instanceIdentifier); } } public void waitForCompletion() { if (currentOps.isEmpty()) { return; } Collection> fts = currentOps.values(); for (ListenableFuture ft : fts) { try { ft.get(); } catch (InterruptedException | ExecutionException e) { LOG.error("Failed to get ft result ", e); } } } @Override public void put(InstanceIdentifier instanceIdentifier, T dataObj) { ListenableFuture ft = ResourceBatchingManager.getInstance().put(getShard(), instanceIdentifier, dataObj); markUpdateInProgress(type, instanceIdentifier, ft); currentOps.put(instanceIdentifier, ft); } @Override public void mergeParentStructurePut(InstanceIdentifier path, T data) { } @Override public void merge(InstanceIdentifier instanceIdentifier, T dataObj) { ListenableFuture ft = ResourceBatchingManager.getInstance().merge(getShard(), instanceIdentifier, dataObj); markUpdateInProgress(type, instanceIdentifier, ft); currentOps.put(instanceIdentifier, ft); } @Override public void mergeParentStructureMerge(InstanceIdentifier path, T data) { //NOT SUPPORTED } public ListenableFuture getFt(InstanceIdentifier instanceIdentifier) { return currentOps.get(instanceIdentifier); } @Override public void delete(InstanceIdentifier instanceIdentifier) { ListenableFuture ft = ResourceBatchingManager.getInstance().delete(getShard(), instanceIdentifier); markUpdateInProgress(type, instanceIdentifier, ft); currentOps.put(instanceIdentifier, ft); } public ListenableFuture submit() { if (currentOps.isEmpty()) { return Futures.immediateFuture(null); } Collection> fts = currentOps.values(); AtomicInteger waitCount = new AtomicInteger(fts.size()); for (ListenableFuture ft : fts) { Futures.addCallback(ft, new FutureCallback() { @Override public void onSuccess(Void voidResult) { if (waitCount.decrementAndGet() == 0) { result.set(null); } } @Override public void onFailure(Throwable throwable) { result.setException(throwable); } }, MoreExecutors.directExecutor()); } return result; } @Override public Object getIdentifier() { return "BatchedTransaction"; } public void setSrcNodeId(NodeId srcNodeId) { this.srcNodeId = srcNodeId; } public NodeId getSrcNodeId() { return srcNodeId; } public boolean updateMetric() { return updateMetric; } public void updateMetric(Boolean update) { this.updateMetric = update; } }