2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.protocol.bgp.state;
11 import static java.util.Objects.requireNonNull;
12 import static java.util.concurrent.TimeUnit.SECONDS;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.List;
21 import java.util.TimerTask;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.stream.Collectors;
27 import javax.annotation.Nonnull;
28 import javax.annotation.concurrent.GuardedBy;
29 import javax.annotation.concurrent.ThreadSafe;
30 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
36 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
37 import org.opendaylight.mdsal.common.api.CommitInfo;
38 import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
39 import org.opendaylight.protocol.bgp.rib.spi.state.BGPPeerState;
40 import org.opendaylight.protocol.bgp.rib.spi.state.BGPRibState;
41 import org.opendaylight.protocol.bgp.rib.spi.state.BGPStateConsumer;
42 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.Bgp;
43 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.BgpBuilder;
44 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Global;
45 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Neighbors;
46 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.PeerGroups;
47 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.NetworkInstances;
48 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.NetworkInstance;
49 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.NetworkInstanceKey;
50 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.network.instance.Protocols;
51 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.network.instance.protocols.Protocol;
52 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.network.instance.protocols.ProtocolKey;
53 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.policy.types.rev151009.BGP;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.openconfig.extensions.rev180329.NetworkInstanceProtocol;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.Rib;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.RibKey;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
63 public final class StateProviderImpl implements TransactionChainListener, AutoCloseable {
64 private static final Logger LOG = LoggerFactory.getLogger(StateProviderImpl.class);
65 private final BGPStateConsumer stateCollector;
66 private final BGPTableTypeRegistryConsumer bgpTableTypeRegistry;
67 private final KeyedInstanceIdentifier<NetworkInstance, NetworkInstanceKey> networkInstanceIId;
68 private final int timeout;
69 private final DataBroker dataBroker;
71 private final Map<String, InstanceIdentifier<Bgp>> instanceIdentifiersCache = new HashMap<>();
73 private BindingTransactionChain transactionChain;
75 private ScheduledFuture<?> scheduleTask;
76 private final ScheduledExecutorService scheduler;
77 private final AtomicBoolean closed = new AtomicBoolean(false);
79 public StateProviderImpl(@Nonnull final DataBroker dataBroker, final int timeout,
80 @Nonnull final BGPTableTypeRegistryConsumer bgpTableTypeRegistry,
81 @Nonnull final BGPStateConsumer stateCollector,
82 @Nonnull final String networkInstanceName) {
83 this(dataBroker, timeout, bgpTableTypeRegistry, stateCollector, networkInstanceName,
84 Executors.newScheduledThreadPool(1));
87 public StateProviderImpl(@Nonnull final DataBroker dataBroker, final int timeout,
88 @Nonnull final BGPTableTypeRegistryConsumer bgpTableTypeRegistry,
89 @Nonnull final BGPStateConsumer stateCollector,
90 @Nonnull final String networkInstanceName,
91 @Nonnull final ScheduledExecutorService scheduler) {
92 this.dataBroker = requireNonNull(dataBroker);
93 this.bgpTableTypeRegistry = requireNonNull(bgpTableTypeRegistry);
94 this.stateCollector = requireNonNull(stateCollector);
95 this.networkInstanceIId = InstanceIdentifier.create(NetworkInstances.class)
96 .child(NetworkInstance.class, new NetworkInstanceKey(networkInstanceName));
97 this.timeout = timeout;
98 this.scheduler = scheduler;
101 public synchronized void init() {
102 this.transactionChain = this.dataBroker.createTransactionChain(this);
103 final TimerTask task = new TimerTask() {
105 @SuppressWarnings("checkstyle:IllegalCatch")
107 synchronized (StateProviderImpl.this) {
108 final WriteTransaction wTx = StateProviderImpl.this.transactionChain.newWriteOnlyTransaction();
112 wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
114 public void onSuccess(CommitInfo result) {
115 LOG.debug("Successfully committed BGP stats update");
119 public void onFailure(Throwable ex) {
120 LOG.error("Failed to commit BGP stats update", ex);
122 }, MoreExecutors.directExecutor());
123 } catch (final Exception e) {
124 LOG.warn("Failed to prepare Tx for BGP stats update", e);
131 this.scheduleTask = this.scheduler.scheduleAtFixedRate(task, 0, this.timeout, SECONDS);
134 private synchronized void updateBGPStats(final WriteTransaction wtx) {
135 final Set<String> oldStats = new HashSet<>(this.instanceIdentifiersCache.keySet());
136 this.stateCollector.getRibStats().stream().filter(BGPRibState::isActive).forEach(bgpStateConsumer -> {
137 final KeyedInstanceIdentifier<Rib, RibKey> ribId = bgpStateConsumer.getInstanceIdentifier();
138 final List<BGPPeerState> peerStats = this.stateCollector.getPeerStats().stream()
139 .filter(BGPPeerState::isActive).filter(peerState -> ribId.equals(peerState.getInstanceIdentifier()))
140 .collect(Collectors.toList());
141 storeOperationalState(bgpStateConsumer, peerStats, ribId.getKey().getId().getValue(), wtx);
142 oldStats.remove(ribId.getKey().getId().getValue());
144 oldStats.forEach(ribId -> removeStoredOperationalState(ribId, wtx));
147 private synchronized void removeStoredOperationalState(final String ribId, final WriteTransaction wtx) {
148 final InstanceIdentifier<Bgp> bgpIID = this.instanceIdentifiersCache.remove(ribId);
149 wtx.delete(LogicalDatastoreType.OPERATIONAL, bgpIID);
152 private synchronized void storeOperationalState(final BGPRibState bgpStateConsumer,
153 final List<BGPPeerState> peerStats, final String ribId, final WriteTransaction wtx) {
154 final Global global = GlobalUtil.buildGlobal(bgpStateConsumer, this.bgpTableTypeRegistry);
155 final PeerGroups peerGroups = PeerGroupUtil.buildPeerGroups(peerStats);
156 final Neighbors neighbors = NeighborUtil.buildNeighbors(peerStats, this.bgpTableTypeRegistry);
157 InstanceIdentifier<Bgp> bgpIID = this.instanceIdentifiersCache.get(ribId);
158 if (bgpIID == null) {
159 final ProtocolKey protocolKey = new ProtocolKey(BGP.class, bgpStateConsumer.getInstanceIdentifier()
160 .getKey().getId().getValue());
161 final KeyedInstanceIdentifier<Protocol, ProtocolKey> protocolIId = this.networkInstanceIId
162 .child(Protocols.class).child(Protocol.class, protocolKey);
163 bgpIID = protocolIId.augmentation(NetworkInstanceProtocol.class).child(Bgp.class);
164 this.instanceIdentifiersCache.put(ribId, bgpIID);
167 final Bgp bgp = new BgpBuilder().setGlobal(global).setNeighbors(neighbors).setPeerGroups(peerGroups).build();
168 wtx.put(LogicalDatastoreType.OPERATIONAL, bgpIID, bgp, WriteTransaction.CREATE_MISSING_PARENTS);
172 public synchronized void close() {
173 if (closed.compareAndSet(false, true)) {
174 this.scheduleTask.cancel(true);
175 if (!this.instanceIdentifiersCache.keySet().isEmpty()) {
176 final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
177 this.instanceIdentifiersCache.keySet().iterator()
178 .forEachRemaining(ribId -> removeStoredOperationalState(ribId, wTx));
179 wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
181 public void onSuccess(final CommitInfo result) {
182 LOG.trace("Successfully operational stats removed.");
186 public void onFailure(final Throwable throwable) {
187 LOG.error("Failed to clean up operational stats", throwable);
189 }, MoreExecutors.directExecutor());
191 this.transactionChain.close();
192 this.scheduler.shutdown();
197 public synchronized void onTransactionChainFailed(final TransactionChain<?, ?> chain,
198 final AsyncTransaction<?, ?> transaction, final Throwable cause) {
199 LOG.error("Transaction chain {} failed for tx {}",
200 chain, transaction != null ? transaction.getIdentifier() : null, cause);
203 transactionChain.close();
204 transactionChain = dataBroker.createTransactionChain(this);
209 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
210 LOG.debug("Transaction chain {} successful.", chain);