* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.protocol.bgp.state;
import static java.util.Objects.requireNonNull;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.util.concurrent.GlobalEventExecutor;
-import io.netty.util.concurrent.ScheduledFuture;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-import javax.annotation.Nonnull;
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.ThreadSafe;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
-import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.TransactionChain;
+import org.opendaylight.mdsal.binding.api.WriteOperations;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
import org.opendaylight.protocol.bgp.rib.spi.state.BGPPeerState;
-import org.opendaylight.protocol.bgp.rib.spi.state.BGPRIBState;
-import org.opendaylight.protocol.bgp.rib.spi.state.BGPStateConsumer;
+import org.opendaylight.protocol.bgp.rib.spi.state.BGPRibState;
+import org.opendaylight.protocol.bgp.rib.spi.state.BGPStateProvider;
import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.Bgp;
import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.BgpBuilder;
import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Global;
import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Neighbors;
import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.PeerGroups;
+import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.OpenconfigNetworkInstanceData;
import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.NetworkInstances;
import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.NetworkInstance;
import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.NetworkInstanceKey;
import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.network.instance.protocols.Protocol;
import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.network.instance.protocols.ProtocolKey;
import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.policy.types.rev151009.BGP;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.openconfig.extensions.rev160614.Protocol1;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.Rib;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.RibKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.openconfig.extensions.rev180329.NetworkInstanceProtocol;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.Rib;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.RibKey;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.Empty;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.RequireServiceComponentRuntime;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@ThreadSafe
-public final class StateProviderImpl implements TransactionChainListener, ClusterSingletonService, AutoCloseable {
+// This class is thread-safe
+@Singleton
+@Component(service = {})
+@Designate(ocd = StateProviderImpl.Configuration.class)
+@RequireServiceComponentRuntime
+public final class StateProviderImpl implements FutureCallback<Empty>, AutoCloseable {
+ @ObjectClassDefinition
+ public @interface Configuration {
+ @AttributeDefinition(description = "Name of the OpenConfig network instance to which to bind")
+ String networkInstanceName() default "global-bgp";
+
+ @AttributeDefinition(description = "Statistics update interval, in seconds", min = "1")
+ int updateIntervalSeconds() default 5;
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(StateProviderImpl.class);
- private static final ServiceGroupIdentifier SERVICE_GROUP_IDENTIFIER = ServiceGroupIdentifier
- .create("bgp-state-provider-service-group");
- private final BGPStateConsumer stateCollector;
+
+ private final BGPStateProvider stateProvider;
private final BGPTableTypeRegistryConsumer bgpTableTypeRegistry;
private final KeyedInstanceIdentifier<NetworkInstance, NetworkInstanceKey> networkInstanceIId;
- private final int timeout;
private final DataBroker dataBroker;
@GuardedBy("this")
private final Map<String, InstanceIdentifier<Bgp>> instanceIdentifiersCache = new HashMap<>();
- private ClusterSingletonServiceRegistration singletonServiceRegistration;
@GuardedBy("this")
- private BindingTransactionChain transactionChain;
+ private TransactionChain transactionChain;
@GuardedBy("this")
- private ScheduledFuture<?> scheduleTask;
+ private final ScheduledFuture<?> scheduleTask;
+ private final ScheduledExecutorService scheduler;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ @Activate
+ public StateProviderImpl(@Reference final @NonNull DataBroker dataBroker,
+ @Reference final @NonNull BGPTableTypeRegistryConsumer bgpTableTypeRegistry,
+ @Reference final @NonNull BGPStateProvider stateProvider, final @NonNull Configuration configuration) {
+ this(dataBroker, configuration.updateIntervalSeconds(), bgpTableTypeRegistry, stateProvider,
+ configuration.networkInstanceName());
+ }
- public StateProviderImpl(@Nonnull final DataBroker dataBroker, final int timeout,
- @Nonnull BGPTableTypeRegistryConsumer bgpTableTypeRegistry, @Nonnull final BGPStateConsumer stateCollector,
- @Nonnull final String networkInstanceName, @Nonnull final ClusterSingletonServiceProvider provider) {
- this.dataBroker = requireNonNull(dataBroker);
- this.bgpTableTypeRegistry = requireNonNull(bgpTableTypeRegistry);
- this.stateCollector = requireNonNull(stateCollector);
- this.networkInstanceIId = InstanceIdentifier.create(NetworkInstances.class)
- .child(NetworkInstance.class, new NetworkInstanceKey(networkInstanceName));
- this.timeout = timeout;
- this.singletonServiceRegistration = requireNonNull(provider)
- .registerClusterSingletonService(this);
+ @Inject
+ public StateProviderImpl(final @NonNull DataBroker dataBroker, final int timeout,
+ final @NonNull BGPTableTypeRegistryConsumer bgpTableTypeRegistry,
+ final @NonNull BGPStateProvider stateProvider,
+ final @NonNull String networkInstanceName) {
+ this(dataBroker, timeout, TimeUnit.SECONDS, bgpTableTypeRegistry, stateProvider, networkInstanceName,
+ Executors.newScheduledThreadPool(1));
}
- @Override
- public synchronized void instantiateServiceInstance() {
- this.transactionChain = this.dataBroker.createTransactionChain(this);
+ @VisibleForTesting
+ StateProviderImpl(final @NonNull DataBroker dataBroker, final long period, final TimeUnit timeUnit,
+ final @NonNull BGPTableTypeRegistryConsumer bgpTableTypeRegistry,
+ final @NonNull BGPStateProvider stateProvider,
+ final @NonNull String networkInstanceName, final @NonNull ScheduledExecutorService scheduler) {
+ this.dataBroker = requireNonNull(dataBroker);
+ this.bgpTableTypeRegistry = requireNonNull(bgpTableTypeRegistry);
+ this.stateProvider = requireNonNull(stateProvider);
+ networkInstanceIId =
+ InstanceIdentifier.builderOfInherited(OpenconfigNetworkInstanceData.class, NetworkInstances.class).build()
+ .child(NetworkInstance.class, new NetworkInstanceKey(networkInstanceName));
+ this.scheduler = scheduler;
+
+ transactionChain = this.dataBroker.createMergingTransactionChain();
+ transactionChain.addCallback(this);
final TimerTask task = new TimerTask() {
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void run() {
synchronized (StateProviderImpl.this) {
- final WriteTransaction wTx = StateProviderImpl.this.transactionChain.newWriteOnlyTransaction();
+ final WriteTransaction wTx = transactionChain.newWriteOnlyTransaction();
try {
updateBGPStats(wTx);
} catch (final Exception e) {
- LOG.warn("Failed to update BGP Stats", e);
- } finally {
- wTx.submit();
+ LOG.warn("Failed to prepare Tx for BGP stats update", e);
+ wTx.cancel();
+ return;
}
+
+ wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
+ @Override
+ public void onSuccess(final CommitInfo result) {
+ LOG.debug("Successfully committed BGP stats update");
+ }
+
+ @Override
+ public void onFailure(final Throwable ex) {
+ LOG.error("Failed to commit BGP stats update", ex);
+ }
+ }, MoreExecutors.directExecutor());
}
}
};
- this.scheduleTask = GlobalEventExecutor.INSTANCE.scheduleAtFixedRate(task, 0, this.timeout,
- TimeUnit.SECONDS);
+ scheduleTask = this.scheduler.scheduleAtFixedRate(task, 0, period, timeUnit);
}
- private synchronized void updateBGPStats(final WriteTransaction wTx) {
- final Set<String> oldStats = new HashSet<>(this.instanceIdentifiersCache.keySet());
- this.stateCollector.getRibStats()
- .forEach(bgpStateConsumer -> {
- final KeyedInstanceIdentifier<Rib, RibKey> ribId = bgpStateConsumer.getInstanceIdentifier();
- final List<BGPPeerState> peerStats = this.stateCollector.getPeerStats().stream()
- .filter(peerState -> ribId.equals(peerState.getInstanceIdentifier())).collect(Collectors.toList());
- storeOperationalState(bgpStateConsumer, peerStats, ribId.getKey().getId().getValue(), wTx);
- oldStats.remove(ribId.getKey().getId().getValue());
- });
- oldStats.forEach(ribId -> removeStoredOperationalState(ribId, wTx));
+ private synchronized void updateBGPStats(final WriteOperations wtx) {
+ final Set<String> oldStats = new HashSet<>(instanceIdentifiersCache.keySet());
+ stateProvider.getRibStats().stream().filter(BGPRibState::isActive).forEach(bgpStateConsumer -> {
+ final KeyedInstanceIdentifier<Rib, RibKey> ribId = bgpStateConsumer.getInstanceIdentifier();
+ final List<BGPPeerState> peerStats = stateProvider.getPeerStats().stream()
+ .filter(BGPPeerState::isActive).filter(peerState -> ribId.equals(peerState.getInstanceIdentifier()))
+ .collect(Collectors.toList());
+ storeOperationalState(bgpStateConsumer, peerStats, ribId.getKey().getId().getValue(), wtx);
+ oldStats.remove(ribId.getKey().getId().getValue());
+ });
+ oldStats.forEach(ribId -> removeStoredOperationalState(ribId, wtx));
}
- private synchronized void removeStoredOperationalState(final String ribId, final WriteTransaction wTx) {
- final InstanceIdentifier<Bgp> bgpIID = this.instanceIdentifiersCache.remove(ribId);
- wTx.delete(LogicalDatastoreType.OPERATIONAL, bgpIID);
+ private synchronized void removeStoredOperationalState(final String ribId, final WriteOperations wtx) {
+ final InstanceIdentifier<Bgp> bgpIID = instanceIdentifiersCache.remove(ribId);
+ wtx.delete(LogicalDatastoreType.OPERATIONAL, bgpIID);
}
- private synchronized void storeOperationalState(final BGPRIBState bgpStateConsumer,
- final List<BGPPeerState> peerStats, final String ribId, final WriteTransaction wTx) {
- final Global global = GlobalUtil.buildGlobal(bgpStateConsumer, this.bgpTableTypeRegistry);
+ private synchronized void storeOperationalState(final BGPRibState bgpStateConsumer,
+ final List<BGPPeerState> peerStats, final String ribId, final WriteOperations wtx) {
+ final Global global = GlobalUtil.buildGlobal(bgpStateConsumer, bgpTableTypeRegistry);
final PeerGroups peerGroups = PeerGroupUtil.buildPeerGroups(peerStats);
- final Neighbors neighbors = NeighborUtil.buildNeighbors(peerStats, this.bgpTableTypeRegistry);
- InstanceIdentifier<Bgp> bgpIID = this.instanceIdentifiersCache.get(ribId);
+ final Neighbors neighbors = NeighborUtil.buildNeighbors(peerStats, bgpTableTypeRegistry);
+ InstanceIdentifier<Bgp> bgpIID = instanceIdentifiersCache.get(ribId);
if (bgpIID == null) {
- final ProtocolKey protocolKey = new ProtocolKey(BGP.class, bgpStateConsumer.getInstanceIdentifier()
- .getKey().getId().getValue());
- final KeyedInstanceIdentifier<Protocol, ProtocolKey> protocolIId = this.networkInstanceIId
- .child(Protocols.class).child(Protocol.class, protocolKey);
- bgpIID = protocolIId.augmentation(Protocol1.class).child(Bgp.class);
- this.instanceIdentifiersCache.put(ribId, bgpIID);
+ final ProtocolKey protocolKey = new ProtocolKey(BGP.VALUE, bgpStateConsumer.getInstanceIdentifier()
+ .getKey().getId().getValue());
+ final KeyedInstanceIdentifier<Protocol, ProtocolKey> protocolIId = networkInstanceIId
+ .child(Protocols.class).child(Protocol.class, protocolKey);
+ bgpIID = protocolIId.augmentation(NetworkInstanceProtocol.class).child(Bgp.class);
+ instanceIdentifiersCache.put(ribId, bgpIID);
}
final Bgp bgp = new BgpBuilder().setGlobal(global).setNeighbors(neighbors).setPeerGroups(peerGroups).build();
- wTx.put(LogicalDatastoreType.OPERATIONAL, bgpIID, bgp, WriteTransaction.CREATE_MISSING_PARENTS);
+ wtx.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, bgpIID, bgp);
}
+ @Deactivate
+ @PreDestroy
@Override
- public void close() throws Exception {
- if (this.singletonServiceRegistration != null) {
- this.singletonServiceRegistration.close();
- this.singletonServiceRegistration = null;
- }
- }
+ public synchronized void close() {
+ if (closed.compareAndSet(false, true)) {
+ scheduleTask.cancel(true);
+ if (!instanceIdentifiersCache.isEmpty()) {
+ final WriteTransaction wTx = transactionChain.newWriteOnlyTransaction();
+ instanceIdentifiersCache.values()
+ .forEach(bgpIID -> wTx.delete(LogicalDatastoreType.OPERATIONAL, bgpIID));
+ instanceIdentifiersCache.clear();
+ wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
+ @Override
+ public void onSuccess(final CommitInfo result) {
+ LOG.trace("Successfully operational stats removed.");
+ }
- @Override
- public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
- final Throwable cause) {
- LOG.error("Transaction chain failed {}.", transaction != null ? transaction.getIdentifier() : null, cause);
+ @Override
+ public void onFailure(final Throwable throwable) {
+ LOG.error("Failed to clean up operational stats", throwable);
+ }
+ }, MoreExecutors.directExecutor());
+ }
+ transactionChain.close();
+ scheduler.shutdown();
+ }
}
@Override
- public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
- LOG.debug("Transaction chain {} successful.", chain);
- }
+ public synchronized void onFailure(final Throwable cause) {
+ LOG.error("Transaction chain failed", cause);
- @Override
- public synchronized ListenableFuture<Void> closeServiceInstance() {
- this.scheduleTask.cancel(true);
- final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
- this.instanceIdentifiersCache.keySet().forEach(ribId -> removeStoredOperationalState(ribId, wTx));
- final CheckedFuture<Void, TransactionCommitFailedException> futureDelete = wTx.submit();
- this.transactionChain.close();
- return futureDelete;
+ if (!closed.get()) {
+ transactionChain.close();
+ transactionChain = dataBroker.createMergingTransactionChain();
+ transactionChain.addCallback(this);
+ }
}
@Override
- public ServiceGroupIdentifier getIdentifier() {
- return SERVICE_GROUP_IDENTIFIER;
+ public void onSuccess(final Empty result) {
+ LOG.debug("Transaction chain successfu.");
}
}