import static java.util.Collections.emptyList;
import static org.opendaylight.netvirt.elan.utils.ElanUtils.isVxlanNetworkOrVxlanSegment;
-import com.google.common.base.Optional;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.genius.mdsalutil.MDSALUtil;
+import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
import org.opendaylight.genius.utils.hwvtep.HwvtepNodeHACache;
import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
import org.opendaylight.infrautils.jobcoordinator.JobCoordinator;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.util.ManagedNewTransactionRunner;
+import org.opendaylight.mdsal.binding.util.RetryingManagedNewTransactionRunner;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.netvirt.elan.cache.ElanInstanceCache;
import org.opendaylight.netvirt.elan.l2gw.ha.listeners.HAOpClusteredListener;
import org.opendaylight.netvirt.elan.l2gw.jobs.AssociateHwvtepToElanJob;
private final ElanRefUtil elanRefUtil;
private final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler;
private final ServiceRecoveryRegistry serviceRecoveryRegistry;
+ private final ManagedNewTransactionRunner txRunner;
@Inject
public L2GatewayConnectionUtils(DataBroker dataBroker, ElanClusterUtils elanClusterUtils,
this.elanRefUtil = elanRefUtil;
this.l2GatewayServiceRecoveryHandler = l2GatewayServiceRecoveryHandler;
this.serviceRecoveryRegistry = serviceRecoveryRegistry;
+ this.txRunner = new RetryingManagedNewTransactionRunner(dataBroker);
}
@Override
LOG.debug("getNeutronL2gateway for {}", l2GatewayId.getValue());
InstanceIdentifier<L2gateway> inst = InstanceIdentifier.create(Neutron.class).child(L2gateways.class)
.child(L2gateway.class, new L2gatewayKey(l2GatewayId));
- return MDSALUtil.read(broker, LogicalDatastoreType.CONFIGURATION, inst).orNull();
+ try {
+ return SingleTransactionDataBroker.syncReadOptional(broker, LogicalDatastoreType.CONFIGURATION, inst)
+ .orElse(null);
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.error("getNeutronL2gateway: Exception while reading L2gateway DS for the ID {}", l2GatewayId, e);
+ }
+ return null;
}
@NonNull
public static List<L2gateway> getL2gatewayList(DataBroker broker) {
InstanceIdentifier<L2gateways> inst = InstanceIdentifier.create(Neutron.class).child(L2gateways.class);
- return MDSALUtil.read(broker, LogicalDatastoreType.CONFIGURATION, inst).toJavaUtil().map(
- L2gateways::getL2gateway).orElse(emptyList());
+ try {
+ return SingleTransactionDataBroker.syncReadOptional(broker, LogicalDatastoreType.CONFIGURATION, inst).map(
+ L2gateways::getL2gateway).orElse(emptyList());
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.error("getNeutronL2gateway: Exception while reading L2gateway DS", e);
+ }
+ return null;
}
@NonNull
public static List<L2gatewayConnection> getAllL2gatewayConnections(DataBroker broker) {
InstanceIdentifier<L2gatewayConnections> inst = InstanceIdentifier.create(Neutron.class)
.child(L2gatewayConnections.class);
- return MDSALUtil.read(broker, LogicalDatastoreType.CONFIGURATION, inst).toJavaUtil().map(
- L2gatewayConnections::getL2gatewayConnection).orElse(emptyList());
+ try {
+ return SingleTransactionDataBroker.syncReadOptional(broker, LogicalDatastoreType.CONFIGURATION, inst).map(
+ L2gatewayConnections::getL2gatewayConnection).orElse(emptyList());
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.error("getNeutronL2gateway: Exception while reading L2gateway DS", e);
+ }
+ return null;
}
/**
// comes we need to wait for elaninstance to resolve. Hence updating the map with the runnable .
// When elanInstance add comes , it look in to the map and run the associated runnable associated with it.
ElanInstance elanInstance = elanInstanceCache.get(networkUuid.getValue(),
- () -> addL2GatewayConnection(input, l2GwDeviceName)).orNull();
+ () -> addL2GatewayConnection(input, l2GwDeviceName)).orElse(null);
if (elanInstance == null) {
return;
}
final InstanceIdentifier<Node> nodeIid = HwvtepSouthboundUtils.createInstanceIdentifier(
new NodeId(l2GatewayDevice.getHwvtepNodeId()));
jobCoordinator.enqueueJob(elanName + ":" + l2GatewayDevice.getDeviceName(), () -> {
- final SettableFuture settableFuture = SettableFuture.create();
- Futures.addCallback(broker.newReadOnlyTransaction().read(LogicalDatastoreType.OPERATIONAL,
- nodeIid), new SettableFutureCallback<Optional<Node>>(settableFuture) {
- @Override
- public void onSuccess(@NonNull Optional<Node> resultNode) {
+ FluentFuture<Optional<Node>> fluentFuture = broker.newReadOnlyTransaction().read(
+ LogicalDatastoreType.OPERATIONAL, nodeIid);
+ Futures.addCallback(fluentFuture, new FutureCallback<Optional<Node>>() {
+ @Override
+ public void onSuccess(Optional<Node> nodeOptional) {
+ if (nodeOptional.isPresent()) {
+ Node node = nodeOptional.get();
+ if (node.augmentation(HwvtepGlobalAugmentation.class) != null) {
+ List<LocalUcastMacs> localUcastMacs =
+ node.augmentation(HwvtepGlobalAugmentation.class).getLocalUcastMacs();
+ if (localUcastMacs == null) {
+ return;
+ }
LocalUcastMacListener localUcastMacListener =
new LocalUcastMacListener(broker, haOpClusteredListener,
elanL2GatewayUtils, jobCoordinator, elanInstanceCache, hwvtepNodeHACache,
l2GatewayServiceRecoveryHandler, serviceRecoveryRegistry);
- settableFuture.set(resultNode);
- Optional<Node> nodeOptional = resultNode;
- if (nodeOptional.isPresent()) {
- Node node = nodeOptional.get();
- if (node.augmentation(HwvtepGlobalAugmentation.class) != null) {
- List<LocalUcastMacs> localUcastMacs =
- node.augmentation(HwvtepGlobalAugmentation.class).getLocalUcastMacs();
- if (localUcastMacs == null) {
- return;
- }
- localUcastMacs.stream()
- .filter((mac) -> macBelongsToLogicalSwitch(mac, elanName))
- .forEach((mac) -> {
- InstanceIdentifier<LocalUcastMacs> macIid = getMacIid(nodeIid, mac);
- localUcastMacListener.added(macIid, mac);
- });
- }
- }
+ localUcastMacs.stream()
+ .filter((mac) -> macBelongsToLogicalSwitch(mac, elanName))
+ .forEach((mac) -> {
+ InstanceIdentifier<LocalUcastMacs> macIid = getMacIid(nodeIid, mac);
+ localUcastMacListener.added(macIid, mac);
+ });
}
- }, MoreExecutors.directExecutor());
- return Lists.newArrayList(settableFuture);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ }
+ }, MoreExecutors.directExecutor());
+ return Lists.newArrayList(fluentFuture);
} , 5);
}