X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=elanmanager%2Fimpl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetvirt%2Felan%2Fl2gw%2Flisteners%2FL2GatewayConnectionListener.java;h=bf5aceec22331d565e5d1f20419f92c464a8c139;hb=109e8de722fbf0608a026dbbe151c3ff7fab5950;hp=66419e80621d9261f46feecfc8576b98dab3b626;hpb=d668a0b0f86a84c884a71a17121d5fbf70606a07;p=netvirt.git diff --git a/elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/listeners/L2GatewayConnectionListener.java b/elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/listeners/L2GatewayConnectionListener.java index 66419e8062..bf5aceec22 100644 --- a/elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/listeners/L2GatewayConnectionListener.java +++ b/elanmanager/impl/src/main/java/org/opendaylight/netvirt/elan/l2gw/listeners/L2GatewayConnectionListener.java @@ -10,39 +10,46 @@ package org.opendaylight.netvirt.elan.l2gw.listeners; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; -import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION; +import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION; -import com.google.common.base.Optional; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.MoreExecutors; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.BiPredicate; import java.util.function.Function; import java.util.function.Predicate; -import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import javax.inject.Inject; import javax.inject.Singleton; - -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.DataObjectModification; -import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; -import org.opendaylight.genius.datastoreutils.AsyncClusteredDataTreeChangeListenerBase; -import org.opendaylight.genius.mdsalutil.MDSALUtil; +import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker; import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils; import org.opendaylight.infrautils.metrics.Counter; import org.opendaylight.infrautils.metrics.Labeled; import org.opendaylight.infrautils.metrics.MetricDescriptor; import org.opendaylight.infrautils.metrics.MetricProvider; +import org.opendaylight.infrautils.utils.concurrent.Executors; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.DataObjectModification; +import org.opendaylight.mdsal.binding.api.ReadTransaction; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil; +import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayConnectionInstanceRecoveryHandler; +import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler; import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils; import org.opendaylight.netvirt.elan.utils.Scheduler; import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache; import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice; +import org.opendaylight.serviceutils.srm.RecoverableListener; +import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry; +import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress; import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.L2gatewayConnections; import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.l2gatewayconnections.L2gatewayConnection; @@ -57,8 +64,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Singleton -public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeListenerBase { +public class L2GatewayConnectionListener extends AbstractClusteredAsyncDataTreeChangeListener + implements RecoverableListener { private static final Logger LOG = LoggerFactory.getLogger(L2GatewayConnectionListener.class); private static final int MAX_READ_TRIALS = 120; @@ -75,8 +82,8 @@ public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeLis private static final Predicate IS_HA_PARENT_NODE = (node) -> { HwvtepGlobalAugmentation augmentation = node.augmentation(HwvtepGlobalAugmentation.class); - if (augmentation != null && augmentation.getManagers() != null) { - return augmentation.getManagers().stream().anyMatch( + if (augmentation != null && augmentation.nonnullManagers() != null) { + return augmentation.nonnullManagers().values().stream().anyMatch( manager -> manager.key().getTarget().getValue().equals(HwvtepHAUtil.MANAGER_KEY)); } return false; @@ -94,17 +101,26 @@ public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeLis @Inject public L2GatewayConnectionListener(final DataBroker db, L2GatewayConnectionUtils l2GatewayConnectionUtils, Scheduler scheduler, L2GatewayCache l2GatewayCache, - MetricProvider metricProvider) { - super(L2gatewayConnection.class, L2GatewayConnectionListener.class); + MetricProvider metricProvider, + final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler, + final L2GatewayConnectionInstanceRecoveryHandler l2InstanceRecoveryHandler, + final ServiceRecoveryRegistry serviceRecoveryRegistry) { + super(db, LogicalDatastoreType.CONFIGURATION, InstanceIdentifier.create(Neutron.class) + .child(L2gatewayConnections.class).child(L2gatewayConnection.class), + Executors.newListeningSingleThreadExecutor("L2GatewayConnectionListener", LOG)); this.broker = db; this.l2GatewayConnectionUtils = l2GatewayConnectionUtils; this.scheduler = scheduler; this.l2GatewayCache = l2GatewayCache; this.elanConnectionsCounter = metricProvider.newCounter(MetricDescriptor.builder() .anchor(this).project("netvirt").module("l2gw").id("connections").build(), "modification", "elan"); + serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(), + this); + serviceRecoveryRegistry.addRecoverableListener(l2InstanceRecoveryHandler.buildServiceRegistryKey(), + this); + init(); } - @PostConstruct public void init() { loadL2GwDeviceCache(1); LOG.trace("Loading l2gw connection cache"); @@ -112,7 +128,25 @@ public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeLis } @Override - protected void add(final InstanceIdentifier identifier, final L2gatewayConnection input) { + @PreDestroy + public void close() { + super.close(); + Executors.shutdownAndAwaitTermination(getExecutorService()); + } + + @Override + public void registerListener() { + super.register(); + LOG.info("Registering L2GatewayConnectionListener"); + } + + public void deregisterListener() { + super.close(); + LOG.info("Deregistering L2GatewayConnectionListener"); + } + + @Override + public void add(final InstanceIdentifier identifier, final L2gatewayConnection input) { LOG.trace("Adding L2gatewayConnection: {}", input); elanConnectionsCounter .label(DataObjectModification.ModificationType.WRITE.name()) @@ -125,7 +159,7 @@ public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeLis } @Override - protected void remove(InstanceIdentifier identifier, L2gatewayConnection input) { + public void remove(InstanceIdentifier identifier, L2gatewayConnection input) { LOG.trace("Removing L2gatewayConnection: {}", input); elanConnectionsCounter .label(DataObjectModification.ModificationType.DELETE.name()) @@ -134,22 +168,11 @@ public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeLis } @Override - protected void update(InstanceIdentifier identifier, L2gatewayConnection original, + public void update(InstanceIdentifier identifier, L2gatewayConnection original, L2gatewayConnection update) { LOG.trace("Updating L2gatewayConnection : original value={}, updated value={}", original, update); } - @Override - protected InstanceIdentifier getWildCardPath() { - return InstanceIdentifier.create(Neutron.class).child(L2gatewayConnections.class) - .child(L2gatewayConnection.class); - } - - @Override - protected L2GatewayConnectionListener getDataTreeChangeListener() { - return this; - } - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", justification = "https://github.com/spotbugs/spotbugs/issues/811") private void loadL2GwDeviceCache(final int trialNo) { @@ -158,15 +181,15 @@ public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeLis LOG.error("Failed to read config topology"); return; } - ReadOnlyTransaction tx = broker.newReadOnlyTransaction(); + ReadTransaction tx = broker.newReadOnlyTransaction(); InstanceIdentifier topoIid = HwvtepSouthboundUtils.createHwvtepTopologyInstanceIdentifier(); Futures.addCallback(tx.read(CONFIGURATION, topoIid), new FutureCallback>() { @Override public void onSuccess(Optional topologyOptional) { if (topologyOptional != null && topologyOptional.isPresent()) { - loadL2GwDeviceCache(topologyOptional.get().getNode()); + loadL2GwDeviceCache(new ArrayList(topologyOptional.get().nonnullNode().values())); } - registerListener(CONFIGURATION, broker); + registerListener(); } @Override @@ -224,10 +247,16 @@ public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeLis .create(Neutron.class) .child(L2gatewayConnections.class); - Optional optional = MDSALUtil.read(broker, CONFIGURATION, parentIid); + Optional optional = Optional.empty(); + try { + optional = SingleTransactionDataBroker.syncReadOptional(broker, CONFIGURATION, + parentIid); + } catch (ExecutionException | InterruptedException e) { + LOG.error("loadL2GwConnectionCache: Exception while reading L2gatewayConnections DS", e); + } if (optional.isPresent() && optional.get().getL2gatewayConnection() != null) { LOG.trace("Found some connections to fill in l2gw connection cache"); - optional.get().getL2gatewayConnection() + new ArrayList<>(optional.get().nonnullL2gatewayConnection().values()) .forEach(connection -> { add(parentIid.child(L2gatewayConnection.class, connection.key()), connection); }); @@ -242,7 +271,8 @@ public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeLis l2GwDevice.setHwvtepNodeId(globalNode.getNodeId().getValue()); List tunnelIps = psNode.augmentation(PhysicalSwitchAugmentation.class) != null - ? psNode.augmentation(PhysicalSwitchAugmentation.class).getTunnelIps() : null; + ? new ArrayList<>(psNode.augmentation(PhysicalSwitchAugmentation.class) + .nonnullTunnelIps().values()) : null; if (tunnelIps != null) { for (TunnelIps tunnelIp : tunnelIps) { IpAddress tunnelIpAddr = tunnelIp.getTunnelIpsKey();