Bump upstreams
[bgpcep.git] / bgp / openconfig-state / src / main / java / org / opendaylight / protocol / bgp / state / StateProviderImpl.java
index c813bc8f5d2453302ac203e4669930a6abb0562d..7d7571302df621ee8805ec78f8634b40a0f8c72b 100644 (file)
@@ -8,8 +8,8 @@
 package org.opendaylight.protocol.bgp.state;
 
 import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.TimeUnit.SECONDS;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.HashMap;
@@ -21,26 +21,30 @@ import java.util.TimerTask;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.binding.api.DataBroker;
-import org.opendaylight.mdsal.binding.api.Transaction;
 import org.opendaylight.mdsal.binding.api.TransactionChain;
-import org.opendaylight.mdsal.binding.api.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.WriteOperations;
 import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
 import org.opendaylight.protocol.bgp.rib.spi.state.BGPPeerState;
 import org.opendaylight.protocol.bgp.rib.spi.state.BGPRibState;
-import org.opendaylight.protocol.bgp.rib.spi.state.BGPStateConsumer;
+import org.opendaylight.protocol.bgp.rib.spi.state.BGPStateProvider;
 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.Bgp;
 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.BgpBuilder;
 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Global;
 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Neighbors;
 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.PeerGroups;
+import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.OpenconfigNetworkInstanceData;
 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.NetworkInstances;
 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.NetworkInstance;
 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.NetworkInstanceKey;
@@ -53,84 +57,117 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.RibKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.Empty;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.RequireServiceComponentRuntime;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 // This class is thread-safe
-public final class StateProviderImpl implements TransactionChainListener, AutoCloseable {
+@Singleton
+@Component(service = {})
+@Designate(ocd = StateProviderImpl.Configuration.class)
+@RequireServiceComponentRuntime
+public final class StateProviderImpl implements FutureCallback<Empty>, AutoCloseable {
+    @ObjectClassDefinition
+    public @interface Configuration {
+        @AttributeDefinition(description = "Name of the OpenConfig network instance to which to bind")
+        String networkInstanceName() default "global-bgp";
+
+        @AttributeDefinition(description = "Statistics update interval, in seconds", min = "1")
+        int updateIntervalSeconds() default 5;
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(StateProviderImpl.class);
-    private final BGPStateConsumer stateCollector;
+
+    private final BGPStateProvider stateProvider;
     private final BGPTableTypeRegistryConsumer bgpTableTypeRegistry;
     private final KeyedInstanceIdentifier<NetworkInstance, NetworkInstanceKey> networkInstanceIId;
-    private final int timeout;
     private final DataBroker dataBroker;
     @GuardedBy("this")
     private final Map<String, InstanceIdentifier<Bgp>> instanceIdentifiersCache = new HashMap<>();
     @GuardedBy("this")
     private TransactionChain transactionChain;
     @GuardedBy("this")
-    private ScheduledFuture<?> scheduleTask;
+    private final ScheduledFuture<?> scheduleTask;
     private final ScheduledExecutorService scheduler;
     private final AtomicBoolean closed = new AtomicBoolean(false);
 
+    @Activate
+    public StateProviderImpl(@Reference final @NonNull DataBroker dataBroker,
+            @Reference final @NonNull BGPTableTypeRegistryConsumer bgpTableTypeRegistry,
+            @Reference final @NonNull BGPStateProvider stateProvider, final @NonNull Configuration configuration) {
+        this(dataBroker, configuration.updateIntervalSeconds(), bgpTableTypeRegistry, stateProvider,
+                configuration.networkInstanceName());
+    }
+
+    @Inject
     public StateProviderImpl(final @NonNull DataBroker dataBroker, final int timeout,
             final @NonNull BGPTableTypeRegistryConsumer bgpTableTypeRegistry,
-            final @NonNull BGPStateConsumer stateCollector, final @NonNull String networkInstanceName) {
-        this(dataBroker, timeout, bgpTableTypeRegistry, stateCollector, networkInstanceName,
+            final @NonNull BGPStateProvider stateProvider,
+            final @NonNull String networkInstanceName) {
+        this(dataBroker, timeout, TimeUnit.SECONDS, bgpTableTypeRegistry, stateProvider, networkInstanceName,
                 Executors.newScheduledThreadPool(1));
     }
 
-    public StateProviderImpl(final @NonNull DataBroker dataBroker, final int timeout,
+    @VisibleForTesting
+    StateProviderImpl(final @NonNull DataBroker dataBroker, final long period, final TimeUnit timeUnit,
             final @NonNull BGPTableTypeRegistryConsumer bgpTableTypeRegistry,
-            final @NonNull BGPStateConsumer stateCollector,
+            final @NonNull BGPStateProvider stateProvider,
             final @NonNull String networkInstanceName, final @NonNull ScheduledExecutorService scheduler) {
         this.dataBroker = requireNonNull(dataBroker);
         this.bgpTableTypeRegistry = requireNonNull(bgpTableTypeRegistry);
-        this.stateCollector = requireNonNull(stateCollector);
-        this.networkInstanceIId = InstanceIdentifier.create(NetworkInstances.class)
+        this.stateProvider = requireNonNull(stateProvider);
+        networkInstanceIId =
+            InstanceIdentifier.builderOfInherited(OpenconfigNetworkInstanceData.class, NetworkInstances.class).build()
                 .child(NetworkInstance.class, new NetworkInstanceKey(networkInstanceName));
-        this.timeout = timeout;
         this.scheduler = scheduler;
-    }
 
-    public synchronized void init() {
-        this.transactionChain = this.dataBroker.createMergingTransactionChain(this);
+        transactionChain = this.dataBroker.createMergingTransactionChain();
+        transactionChain.addCallback(this);
         final TimerTask task = new TimerTask() {
             @Override
             @SuppressWarnings("checkstyle:IllegalCatch")
             public void run() {
                 synchronized (StateProviderImpl.this) {
-                    final WriteTransaction wTx = StateProviderImpl.this.transactionChain.newWriteOnlyTransaction();
+                    final WriteTransaction wTx = transactionChain.newWriteOnlyTransaction();
                     try {
                         updateBGPStats(wTx);
-
-                        wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
-                            @Override
-                            public void onSuccess(final CommitInfo result) {
-                                LOG.debug("Successfully committed BGP stats update");
-                            }
-
-                            @Override
-                            public void onFailure(final Throwable ex) {
-                                LOG.error("Failed to commit BGP stats update", ex);
-                            }
-                        }, MoreExecutors.directExecutor());
                     } catch (final Exception e) {
                         LOG.warn("Failed to prepare Tx for BGP stats update", e);
                         wTx.cancel();
+                        return;
                     }
+
+                    wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
+                        @Override
+                        public void onSuccess(final CommitInfo result) {
+                            LOG.debug("Successfully committed BGP stats update");
+                        }
+
+                        @Override
+                        public void onFailure(final Throwable ex) {
+                            LOG.error("Failed to commit BGP stats update", ex);
+                        }
+                    }, MoreExecutors.directExecutor());
                 }
             }
         };
 
-        this.scheduleTask = this.scheduler.scheduleAtFixedRate(task, 0, this.timeout, SECONDS);
+        scheduleTask = this.scheduler.scheduleAtFixedRate(task, 0, period, timeUnit);
     }
 
-    private synchronized void updateBGPStats(final WriteTransaction wtx) {
-        final Set<String> oldStats = new HashSet<>(this.instanceIdentifiersCache.keySet());
-        this.stateCollector.getRibStats().stream().filter(BGPRibState::isActive).forEach(bgpStateConsumer -> {
+    private synchronized void updateBGPStats(final WriteOperations wtx) {
+        final Set<String> oldStats = new HashSet<>(instanceIdentifiersCache.keySet());
+        stateProvider.getRibStats().stream().filter(BGPRibState::isActive).forEach(bgpStateConsumer -> {
             final KeyedInstanceIdentifier<Rib, RibKey> ribId = bgpStateConsumer.getInstanceIdentifier();
-            final List<BGPPeerState> peerStats = this.stateCollector.getPeerStats().stream()
+            final List<BGPPeerState> peerStats = stateProvider.getPeerStats().stream()
                     .filter(BGPPeerState::isActive).filter(peerState -> ribId.equals(peerState.getInstanceIdentifier()))
                     .collect(Collectors.toList());
             storeOperationalState(bgpStateConsumer, peerStats, ribId.getKey().getId().getValue(), wtx);
@@ -139,38 +176,41 @@ public final class StateProviderImpl implements TransactionChainListener, AutoCl
         oldStats.forEach(ribId -> removeStoredOperationalState(ribId, wtx));
     }
 
-    private synchronized void removeStoredOperationalState(final String ribId, final WriteTransaction wtx) {
-        final InstanceIdentifier<Bgp> bgpIID = this.instanceIdentifiersCache.remove(ribId);
+    private synchronized void removeStoredOperationalState(final String ribId, final WriteOperations wtx) {
+        final InstanceIdentifier<Bgp> bgpIID = instanceIdentifiersCache.remove(ribId);
         wtx.delete(LogicalDatastoreType.OPERATIONAL, bgpIID);
     }
 
     private synchronized void storeOperationalState(final BGPRibState bgpStateConsumer,
-            final List<BGPPeerState> peerStats, final String ribId, final WriteTransaction wtx) {
-        final Global global = GlobalUtil.buildGlobal(bgpStateConsumer, this.bgpTableTypeRegistry);
+            final List<BGPPeerState> peerStats, final String ribId, final WriteOperations wtx) {
+        final Global global = GlobalUtil.buildGlobal(bgpStateConsumer, bgpTableTypeRegistry);
         final PeerGroups peerGroups = PeerGroupUtil.buildPeerGroups(peerStats);
-        final Neighbors neighbors = NeighborUtil.buildNeighbors(peerStats, this.bgpTableTypeRegistry);
-        InstanceIdentifier<Bgp> bgpIID = this.instanceIdentifiersCache.get(ribId);
+        final Neighbors neighbors = NeighborUtil.buildNeighbors(peerStats, bgpTableTypeRegistry);
+        InstanceIdentifier<Bgp> bgpIID = instanceIdentifiersCache.get(ribId);
         if (bgpIID == null) {
-            final ProtocolKey protocolKey = new ProtocolKey(BGP.class, bgpStateConsumer.getInstanceIdentifier()
+            final ProtocolKey protocolKey = new ProtocolKey(BGP.VALUE, bgpStateConsumer.getInstanceIdentifier()
                     .getKey().getId().getValue());
-            final KeyedInstanceIdentifier<Protocol, ProtocolKey> protocolIId = this.networkInstanceIId
+            final KeyedInstanceIdentifier<Protocol, ProtocolKey> protocolIId = networkInstanceIId
                     .child(Protocols.class).child(Protocol.class, protocolKey);
             bgpIID = protocolIId.augmentation(NetworkInstanceProtocol.class).child(Bgp.class);
-            this.instanceIdentifiersCache.put(ribId, bgpIID);
+            instanceIdentifiersCache.put(ribId, bgpIID);
         }
 
         final Bgp bgp = new BgpBuilder().setGlobal(global).setNeighbors(neighbors).setPeerGroups(peerGroups).build();
         wtx.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, bgpIID, bgp);
     }
 
+    @Deactivate
+    @PreDestroy
     @Override
     public synchronized void close() {
         if (closed.compareAndSet(false, true)) {
-            this.scheduleTask.cancel(true);
-            if (!this.instanceIdentifiersCache.keySet().isEmpty()) {
-                final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
-                this.instanceIdentifiersCache.keySet().iterator()
-                .forEachRemaining(ribId -> removeStoredOperationalState(ribId, wTx));
+            scheduleTask.cancel(true);
+            if (!instanceIdentifiersCache.isEmpty()) {
+                final WriteTransaction wTx = transactionChain.newWriteOnlyTransaction();
+                instanceIdentifiersCache.values()
+                        .forEach(bgpIID -> wTx.delete(LogicalDatastoreType.OPERATIONAL, bgpIID));
+                instanceIdentifiersCache.clear();
                 wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
                     @Override
                     public void onSuccess(final CommitInfo result) {
@@ -183,25 +223,24 @@ public final class StateProviderImpl implements TransactionChainListener, AutoCl
                     }
                 }, MoreExecutors.directExecutor());
             }
-            this.transactionChain.close();
-            this.scheduler.shutdown();
+            transactionChain.close();
+            scheduler.shutdown();
         }
     }
 
     @Override
-    public synchronized void onTransactionChainFailed(final TransactionChain chain, final Transaction transaction,
-            final Throwable cause) {
-        LOG.error("Transaction chain {} failed for tx {}",
-                chain, transaction != null ? transaction.getIdentifier() : null, cause);
+    public synchronized void onFailure(final Throwable cause) {
+        LOG.error("Transaction chain failed", cause);
 
         if (!closed.get()) {
             transactionChain.close();
-            transactionChain = dataBroker.createMergingTransactionChain(this);
+            transactionChain = dataBroker.createMergingTransactionChain();
+            transactionChain.addCallback(this);
         }
     }
 
     @Override
-    public void onTransactionChainSuccessful(final TransactionChain chain) {
-        LOG.debug("Transaction chain {} successful.", chain);
+    public void onSuccess(final Empty result) {
+        LOG.debug("Transaction chain successfu.");
     }
 }