2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.protocol.bgp.state;
10 import static java.util.Objects.requireNonNull;
11 import static java.util.concurrent.TimeUnit.SECONDS;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import java.util.HashMap;
16 import java.util.HashSet;
17 import java.util.List;
20 import java.util.TimerTask;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.stream.Collectors;
26 import org.checkerframework.checker.lock.qual.GuardedBy;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.opendaylight.mdsal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.binding.api.Transaction;
30 import org.opendaylight.mdsal.binding.api.TransactionChain;
31 import org.opendaylight.mdsal.binding.api.TransactionChainListener;
32 import org.opendaylight.mdsal.binding.api.WriteTransaction;
33 import org.opendaylight.mdsal.common.api.CommitInfo;
34 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
35 import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
36 import org.opendaylight.protocol.bgp.rib.spi.state.BGPPeerState;
37 import org.opendaylight.protocol.bgp.rib.spi.state.BGPRibState;
38 import org.opendaylight.protocol.bgp.rib.spi.state.BGPStateConsumer;
39 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.Bgp;
40 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.BgpBuilder;
41 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Global;
42 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Neighbors;
43 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.PeerGroups;
44 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.NetworkInstances;
45 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.NetworkInstance;
46 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.NetworkInstanceKey;
47 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.network.instance.Protocols;
48 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.network.instance.protocols.Protocol;
49 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.network.instance.protocols.ProtocolKey;
50 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.policy.types.rev151009.BGP;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.openconfig.extensions.rev180329.NetworkInstanceProtocol;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.Rib;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.RibKey;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
59 // This class is thread-safe
60 public final class StateProviderImpl implements TransactionChainListener, AutoCloseable {
61 private static final Logger LOG = LoggerFactory.getLogger(StateProviderImpl.class);
62 private final BGPStateConsumer stateCollector;
63 private final BGPTableTypeRegistryConsumer bgpTableTypeRegistry;
64 private final KeyedInstanceIdentifier<NetworkInstance, NetworkInstanceKey> networkInstanceIId;
65 private final int timeout;
66 private final DataBroker dataBroker;
68 private final Map<String, InstanceIdentifier<Bgp>> instanceIdentifiersCache = new HashMap<>();
70 private TransactionChain transactionChain;
72 private ScheduledFuture<?> scheduleTask;
73 private final ScheduledExecutorService scheduler;
74 private final AtomicBoolean closed = new AtomicBoolean(false);
76 public StateProviderImpl(final @NonNull DataBroker dataBroker, final int timeout,
77 final @NonNull BGPTableTypeRegistryConsumer bgpTableTypeRegistry,
78 final @NonNull BGPStateConsumer stateCollector, final @NonNull String networkInstanceName) {
79 this(dataBroker, timeout, bgpTableTypeRegistry, stateCollector, networkInstanceName,
80 Executors.newScheduledThreadPool(1));
83 public StateProviderImpl(final @NonNull DataBroker dataBroker, final int timeout,
84 final @NonNull BGPTableTypeRegistryConsumer bgpTableTypeRegistry,
85 final @NonNull BGPStateConsumer stateCollector,
86 final @NonNull String networkInstanceName, final @NonNull ScheduledExecutorService scheduler) {
87 this.dataBroker = requireNonNull(dataBroker);
88 this.bgpTableTypeRegistry = requireNonNull(bgpTableTypeRegistry);
89 this.stateCollector = requireNonNull(stateCollector);
90 this.networkInstanceIId = InstanceIdentifier.create(NetworkInstances.class)
91 .child(NetworkInstance.class, new NetworkInstanceKey(networkInstanceName));
92 this.timeout = timeout;
93 this.scheduler = scheduler;
96 public synchronized void init() {
97 this.transactionChain = this.dataBroker.createMergingTransactionChain(this);
98 final TimerTask task = new TimerTask() {
100 @SuppressWarnings("checkstyle:IllegalCatch")
102 synchronized (StateProviderImpl.this) {
103 final WriteTransaction wTx = StateProviderImpl.this.transactionChain.newWriteOnlyTransaction();
107 wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
109 public void onSuccess(final CommitInfo result) {
110 LOG.debug("Successfully committed BGP stats update");
114 public void onFailure(final Throwable ex) {
115 LOG.error("Failed to commit BGP stats update", ex);
117 }, MoreExecutors.directExecutor());
118 } catch (final Exception e) {
119 LOG.warn("Failed to prepare Tx for BGP stats update", e);
126 this.scheduleTask = this.scheduler.scheduleAtFixedRate(task, 0, this.timeout, SECONDS);
129 private synchronized void updateBGPStats(final WriteTransaction wtx) {
130 final Set<String> oldStats = new HashSet<>(this.instanceIdentifiersCache.keySet());
131 this.stateCollector.getRibStats().stream().filter(BGPRibState::isActive).forEach(bgpStateConsumer -> {
132 final KeyedInstanceIdentifier<Rib, RibKey> ribId = bgpStateConsumer.getInstanceIdentifier();
133 final List<BGPPeerState> peerStats = this.stateCollector.getPeerStats().stream()
134 .filter(BGPPeerState::isActive).filter(peerState -> ribId.equals(peerState.getInstanceIdentifier()))
135 .collect(Collectors.toList());
136 storeOperationalState(bgpStateConsumer, peerStats, ribId.getKey().getId().getValue(), wtx);
137 oldStats.remove(ribId.getKey().getId().getValue());
139 oldStats.forEach(ribId -> removeStoredOperationalState(ribId, wtx));
142 private synchronized void removeStoredOperationalState(final String ribId, final WriteTransaction wtx) {
143 final InstanceIdentifier<Bgp> bgpIID = this.instanceIdentifiersCache.remove(ribId);
144 wtx.delete(LogicalDatastoreType.OPERATIONAL, bgpIID);
147 private synchronized void storeOperationalState(final BGPRibState bgpStateConsumer,
148 final List<BGPPeerState> peerStats, final String ribId, final WriteTransaction wtx) {
149 final Global global = GlobalUtil.buildGlobal(bgpStateConsumer, this.bgpTableTypeRegistry);
150 final PeerGroups peerGroups = PeerGroupUtil.buildPeerGroups(peerStats);
151 final Neighbors neighbors = NeighborUtil.buildNeighbors(peerStats, this.bgpTableTypeRegistry);
152 InstanceIdentifier<Bgp> bgpIID = this.instanceIdentifiersCache.get(ribId);
153 if (bgpIID == null) {
154 final ProtocolKey protocolKey = new ProtocolKey(BGP.class, bgpStateConsumer.getInstanceIdentifier()
155 .getKey().getId().getValue());
156 final KeyedInstanceIdentifier<Protocol, ProtocolKey> protocolIId = this.networkInstanceIId
157 .child(Protocols.class).child(Protocol.class, protocolKey);
158 bgpIID = protocolIId.augmentation(NetworkInstanceProtocol.class).child(Bgp.class);
159 this.instanceIdentifiersCache.put(ribId, bgpIID);
162 final Bgp bgp = new BgpBuilder().setGlobal(global).setNeighbors(neighbors).setPeerGroups(peerGroups).build();
163 wtx.put(LogicalDatastoreType.OPERATIONAL, bgpIID, bgp, WriteTransaction.CREATE_MISSING_PARENTS);
167 public synchronized void close() {
168 if (closed.compareAndSet(false, true)) {
169 this.scheduleTask.cancel(true);
170 if (!this.instanceIdentifiersCache.keySet().isEmpty()) {
171 final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
172 this.instanceIdentifiersCache.keySet().iterator()
173 .forEachRemaining(ribId -> removeStoredOperationalState(ribId, wTx));
174 wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
176 public void onSuccess(final CommitInfo result) {
177 LOG.trace("Successfully operational stats removed.");
181 public void onFailure(final Throwable throwable) {
182 LOG.error("Failed to clean up operational stats", throwable);
184 }, MoreExecutors.directExecutor());
186 this.transactionChain.close();
187 this.scheduler.shutdown();
192 public synchronized void onTransactionChainFailed(final TransactionChain chain, final Transaction transaction,
193 final Throwable cause) {
194 LOG.error("Transaction chain {} failed for tx {}",
195 chain, transaction != null ? transaction.getIdentifier() : null, cause);
198 transactionChain.close();
199 transactionChain = dataBroker.createMergingTransactionChain(this);
204 public void onTransactionChainSuccessful(final TransactionChain chain) {
205 LOG.debug("Transaction chain {} successful.", chain);