Using nonNull API's in ELAN
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / l2gw / listeners / L2GatewayConnectionListener.java
index 66419e80621d9261f46feecfc8576b98dab3b626..bf5aceec22331d565e5d1f20419f92c464a8c139 100644 (file)
@@ -10,39 +10,46 @@ package org.opendaylight.netvirt.elan.l2gw.listeners;
 import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
-import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
+import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
 
-import com.google.common.base.Optional;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.MoreExecutors;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiPredicate;
 import java.util.function.Function;
 import java.util.function.Predicate;
-import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.genius.datastoreutils.AsyncClusteredDataTreeChangeListenerBase;
-import org.opendaylight.genius.mdsalutil.MDSALUtil;
+import org.opendaylight.genius.datastoreutils.SingleTransactionDataBroker;
 import org.opendaylight.genius.utils.hwvtep.HwvtepSouthboundUtils;
 import org.opendaylight.infrautils.metrics.Counter;
 import org.opendaylight.infrautils.metrics.Labeled;
 import org.opendaylight.infrautils.metrics.MetricDescriptor;
 import org.opendaylight.infrautils.metrics.MetricProvider;
+import org.opendaylight.infrautils.utils.concurrent.Executors;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.ReadTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.netvirt.elan.l2gw.ha.HwvtepHAUtil;
+import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayConnectionInstanceRecoveryHandler;
+import org.opendaylight.netvirt.elan.l2gw.recovery.impl.L2GatewayServiceRecoveryHandler;
 import org.opendaylight.netvirt.elan.l2gw.utils.L2GatewayConnectionUtils;
 import org.opendaylight.netvirt.elan.utils.Scheduler;
 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayCache;
 import org.opendaylight.netvirt.neutronvpn.api.l2gw.L2GatewayDevice;
+import org.opendaylight.serviceutils.srm.RecoverableListener;
+import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
+import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.L2gatewayConnections;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.neutron.l2gateways.rev150712.l2gateway.connections.attributes.l2gatewayconnections.L2gatewayConnection;
@@ -57,8 +64,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Singleton
-public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeListenerBase<L2gatewayConnection,
-        L2GatewayConnectionListener> {
+public class L2GatewayConnectionListener extends AbstractClusteredAsyncDataTreeChangeListener<L2gatewayConnection>
+        implements RecoverableListener {
     private static final Logger LOG = LoggerFactory.getLogger(L2GatewayConnectionListener.class);
     private static final int MAX_READ_TRIALS = 120;
 
@@ -75,8 +82,8 @@ public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeLis
 
     private static final Predicate<Node> IS_HA_PARENT_NODE = (node) -> {
         HwvtepGlobalAugmentation augmentation = node.augmentation(HwvtepGlobalAugmentation.class);
-        if (augmentation != null && augmentation.getManagers() != null) {
-            return augmentation.getManagers().stream().anyMatch(
+        if (augmentation != null && augmentation.nonnullManagers() != null) {
+            return augmentation.nonnullManagers().values().stream().anyMatch(
                 manager -> manager.key().getTarget().getValue().equals(HwvtepHAUtil.MANAGER_KEY));
         }
         return false;
@@ -94,17 +101,26 @@ public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeLis
     @Inject
     public L2GatewayConnectionListener(final DataBroker db, L2GatewayConnectionUtils l2GatewayConnectionUtils,
                                        Scheduler scheduler, L2GatewayCache l2GatewayCache,
-                                       MetricProvider metricProvider) {
-        super(L2gatewayConnection.class, L2GatewayConnectionListener.class);
+                                       MetricProvider metricProvider,
+                                       final L2GatewayServiceRecoveryHandler l2GatewayServiceRecoveryHandler,
+                                       final L2GatewayConnectionInstanceRecoveryHandler l2InstanceRecoveryHandler,
+                                       final ServiceRecoveryRegistry serviceRecoveryRegistry) {
+        super(db, LogicalDatastoreType.CONFIGURATION, InstanceIdentifier.create(Neutron.class)
+                .child(L2gatewayConnections.class).child(L2gatewayConnection.class),
+                Executors.newListeningSingleThreadExecutor("L2GatewayConnectionListener", LOG));
         this.broker = db;
         this.l2GatewayConnectionUtils = l2GatewayConnectionUtils;
         this.scheduler = scheduler;
         this.l2GatewayCache = l2GatewayCache;
         this.elanConnectionsCounter = metricProvider.newCounter(MetricDescriptor.builder()
                 .anchor(this).project("netvirt").module("l2gw").id("connections").build(), "modification", "elan");
+        serviceRecoveryRegistry.addRecoverableListener(l2GatewayServiceRecoveryHandler.buildServiceRegistryKey(),
+                this);
+        serviceRecoveryRegistry.addRecoverableListener(l2InstanceRecoveryHandler.buildServiceRegistryKey(),
+                this);
+        init();
     }
 
-    @PostConstruct
     public void init() {
         loadL2GwDeviceCache(1);
         LOG.trace("Loading l2gw connection cache");
@@ -112,7 +128,25 @@ public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeLis
     }
 
     @Override
-    protected void add(final InstanceIdentifier<L2gatewayConnection> identifier, final L2gatewayConnection input) {
+    @PreDestroy
+    public void close() {
+        super.close();
+        Executors.shutdownAndAwaitTermination(getExecutorService());
+    }
+
+    @Override
+    public void registerListener() {
+        super.register();
+        LOG.info("Registering L2GatewayConnectionListener");
+    }
+
+    public void deregisterListener() {
+        super.close();
+        LOG.info("Deregistering L2GatewayConnectionListener");
+    }
+
+    @Override
+    public void add(final InstanceIdentifier<L2gatewayConnection> identifier, final L2gatewayConnection input) {
         LOG.trace("Adding L2gatewayConnection: {}", input);
         elanConnectionsCounter
                 .label(DataObjectModification.ModificationType.WRITE.name())
@@ -125,7 +159,7 @@ public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeLis
     }
 
     @Override
-    protected void remove(InstanceIdentifier<L2gatewayConnection> identifier, L2gatewayConnection input) {
+    public void remove(InstanceIdentifier<L2gatewayConnection> identifier, L2gatewayConnection input) {
         LOG.trace("Removing L2gatewayConnection: {}", input);
         elanConnectionsCounter
                 .label(DataObjectModification.ModificationType.DELETE.name())
@@ -134,22 +168,11 @@ public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeLis
     }
 
     @Override
-    protected void update(InstanceIdentifier<L2gatewayConnection> identifier, L2gatewayConnection original,
+    public void update(InstanceIdentifier<L2gatewayConnection> identifier, L2gatewayConnection original,
             L2gatewayConnection update) {
         LOG.trace("Updating L2gatewayConnection : original value={}, updated value={}", original, update);
     }
 
-    @Override
-    protected InstanceIdentifier<L2gatewayConnection> getWildCardPath() {
-        return InstanceIdentifier.create(Neutron.class).child(L2gatewayConnections.class)
-            .child(L2gatewayConnection.class);
-    }
-
-    @Override
-    protected L2GatewayConnectionListener getDataTreeChangeListener() {
-        return this;
-    }
-
     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
             justification = "https://github.com/spotbugs/spotbugs/issues/811")
     private void loadL2GwDeviceCache(final int trialNo) {
@@ -158,15 +181,15 @@ public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeLis
                 LOG.error("Failed to read config topology");
                 return;
             }
-            ReadOnlyTransaction tx = broker.newReadOnlyTransaction();
+            ReadTransaction tx = broker.newReadOnlyTransaction();
             InstanceIdentifier<Topology> topoIid = HwvtepSouthboundUtils.createHwvtepTopologyInstanceIdentifier();
             Futures.addCallback(tx.read(CONFIGURATION, topoIid), new FutureCallback<Optional<Topology>>() {
                 @Override
                 public void onSuccess(Optional<Topology> topologyOptional) {
                     if (topologyOptional != null && topologyOptional.isPresent()) {
-                        loadL2GwDeviceCache(topologyOptional.get().getNode());
+                        loadL2GwDeviceCache(new ArrayList<Node>(topologyOptional.get().nonnullNode().values()));
                     }
-                    registerListener(CONFIGURATION, broker);
+                    registerListener();
                 }
 
                 @Override
@@ -224,10 +247,16 @@ public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeLis
                 .create(Neutron.class)
                 .child(L2gatewayConnections.class);
 
-        Optional<L2gatewayConnections> optional = MDSALUtil.read(broker, CONFIGURATION, parentIid);
+        Optional<L2gatewayConnections> optional = Optional.empty();
+        try {
+            optional = SingleTransactionDataBroker.syncReadOptional(broker, CONFIGURATION,
+                    parentIid);
+        } catch (ExecutionException | InterruptedException e) {
+            LOG.error("loadL2GwConnectionCache: Exception while reading L2gatewayConnections DS", e);
+        }
         if (optional.isPresent() && optional.get().getL2gatewayConnection() != null) {
             LOG.trace("Found some connections to fill in l2gw connection cache");
-            optional.get().getL2gatewayConnection()
+            new ArrayList<>(optional.get().nonnullL2gatewayConnection().values())
                     .forEach(connection -> {
                         add(parentIid.child(L2gatewayConnection.class, connection.key()), connection);
                     });
@@ -242,7 +271,8 @@ public class L2GatewayConnectionListener extends AsyncClusteredDataTreeChangeLis
         l2GwDevice.setHwvtepNodeId(globalNode.getNodeId().getValue());
 
         List<TunnelIps> tunnelIps = psNode.augmentation(PhysicalSwitchAugmentation.class) != null
-                ? psNode.augmentation(PhysicalSwitchAugmentation.class).getTunnelIps() : null;
+                ? new ArrayList<>(psNode.augmentation(PhysicalSwitchAugmentation.class)
+            .nonnullTunnelIps().values()) : null;
         if (tunnelIps != null) {
             for (TunnelIps tunnelIp : tunnelIps) {
                 IpAddress tunnelIpAddr = tunnelIp.getTunnelIpsKey();