Bump odlparent to 6.0.0
[bgpcep.git] / bgp / openconfig-state / src / main / java / org / opendaylight / protocol / bgp / state / StateProviderImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.state;
9
10 import static java.util.Objects.requireNonNull;
11 import static java.util.concurrent.TimeUnit.SECONDS;
12
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Set;
21 import java.util.TimerTask;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.stream.Collectors;
27 import org.checkerframework.checker.lock.qual.GuardedBy;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.mdsal.binding.api.DataBroker;
30 import org.opendaylight.mdsal.binding.api.Transaction;
31 import org.opendaylight.mdsal.binding.api.TransactionChain;
32 import org.opendaylight.mdsal.binding.api.TransactionChainListener;
33 import org.opendaylight.mdsal.binding.api.WriteTransaction;
34 import org.opendaylight.mdsal.common.api.CommitInfo;
35 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
36 import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
37 import org.opendaylight.protocol.bgp.rib.spi.state.BGPPeerState;
38 import org.opendaylight.protocol.bgp.rib.spi.state.BGPRibState;
39 import org.opendaylight.protocol.bgp.rib.spi.state.BGPStateConsumer;
40 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.Bgp;
41 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.BgpBuilder;
42 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Global;
43 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Neighbors;
44 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.PeerGroups;
45 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.NetworkInstances;
46 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.NetworkInstance;
47 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.NetworkInstanceKey;
48 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.network.instance.Protocols;
49 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.network.instance.protocols.Protocol;
50 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.network.instance.protocols.ProtocolKey;
51 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.policy.types.rev151009.BGP;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.openconfig.extensions.rev180329.NetworkInstanceProtocol;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.Rib;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.RibKey;
55 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
56 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 // This class is thread-safe
61 public final class StateProviderImpl implements TransactionChainListener, AutoCloseable {
62     private static final Logger LOG = LoggerFactory.getLogger(StateProviderImpl.class);
63     private final BGPStateConsumer stateCollector;
64     private final BGPTableTypeRegistryConsumer bgpTableTypeRegistry;
65     private final KeyedInstanceIdentifier<NetworkInstance, NetworkInstanceKey> networkInstanceIId;
66     private final int timeout;
67     private final DataBroker dataBroker;
68     @GuardedBy("this")
69     private final Map<String, InstanceIdentifier<Bgp>> instanceIdentifiersCache = new HashMap<>();
70     @GuardedBy("this")
71     private TransactionChain transactionChain;
72     @GuardedBy("this")
73     private ScheduledFuture<?> scheduleTask;
74     private final ScheduledExecutorService scheduler;
75     private final AtomicBoolean closed = new AtomicBoolean(false);
76
77     public StateProviderImpl(final @NonNull DataBroker dataBroker, final int timeout,
78             final @NonNull BGPTableTypeRegistryConsumer bgpTableTypeRegistry,
79             final @NonNull BGPStateConsumer stateCollector, final @NonNull String networkInstanceName) {
80         this(dataBroker, timeout, bgpTableTypeRegistry, stateCollector, networkInstanceName,
81                 Executors.newScheduledThreadPool(1));
82     }
83
84     public StateProviderImpl(final @NonNull DataBroker dataBroker, final int timeout,
85             final @NonNull BGPTableTypeRegistryConsumer bgpTableTypeRegistry,
86             final @NonNull BGPStateConsumer stateCollector,
87             final @NonNull String networkInstanceName, final @NonNull ScheduledExecutorService scheduler) {
88         this.dataBroker = requireNonNull(dataBroker);
89         this.bgpTableTypeRegistry = requireNonNull(bgpTableTypeRegistry);
90         this.stateCollector = requireNonNull(stateCollector);
91         this.networkInstanceIId = InstanceIdentifier.create(NetworkInstances.class)
92                 .child(NetworkInstance.class, new NetworkInstanceKey(networkInstanceName));
93         this.timeout = timeout;
94         this.scheduler = scheduler;
95     }
96
97     public synchronized void init() {
98         this.transactionChain = this.dataBroker.createMergingTransactionChain(this);
99         final TimerTask task = new TimerTask() {
100             @Override
101             @SuppressWarnings("checkstyle:IllegalCatch")
102             public void run() {
103                 synchronized (StateProviderImpl.this) {
104                     final WriteTransaction wTx = StateProviderImpl.this.transactionChain.newWriteOnlyTransaction();
105                     try {
106                         updateBGPStats(wTx);
107
108                         wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
109                             @Override
110                             public void onSuccess(final CommitInfo result) {
111                                 LOG.debug("Successfully committed BGP stats update");
112                             }
113
114                             @Override
115                             public void onFailure(final Throwable ex) {
116                                 LOG.error("Failed to commit BGP stats update", ex);
117                             }
118                         }, MoreExecutors.directExecutor());
119                     } catch (final Exception e) {
120                         LOG.warn("Failed to prepare Tx for BGP stats update", e);
121                         wTx.cancel();
122                     }
123                 }
124             }
125         };
126
127         this.scheduleTask = this.scheduler.scheduleAtFixedRate(task, 0, this.timeout, SECONDS);
128     }
129
130     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
131             justification = "https://github.com/spotbugs/spotbugs/issues/811")
132     private synchronized void updateBGPStats(final WriteTransaction wtx) {
133         final Set<String> oldStats = new HashSet<>(this.instanceIdentifiersCache.keySet());
134         this.stateCollector.getRibStats().stream().filter(BGPRibState::isActive).forEach(bgpStateConsumer -> {
135             final KeyedInstanceIdentifier<Rib, RibKey> ribId = bgpStateConsumer.getInstanceIdentifier();
136             final List<BGPPeerState> peerStats = this.stateCollector.getPeerStats().stream()
137                     .filter(BGPPeerState::isActive).filter(peerState -> ribId.equals(peerState.getInstanceIdentifier()))
138                     .collect(Collectors.toList());
139             storeOperationalState(bgpStateConsumer, peerStats, ribId.getKey().getId().getValue(), wtx);
140             oldStats.remove(ribId.getKey().getId().getValue());
141         });
142         oldStats.forEach(ribId -> removeStoredOperationalState(ribId, wtx));
143     }
144
145     private synchronized void removeStoredOperationalState(final String ribId, final WriteTransaction wtx) {
146         final InstanceIdentifier<Bgp> bgpIID = this.instanceIdentifiersCache.remove(ribId);
147         wtx.delete(LogicalDatastoreType.OPERATIONAL, bgpIID);
148     }
149
150     private synchronized void storeOperationalState(final BGPRibState bgpStateConsumer,
151             final List<BGPPeerState> peerStats, final String ribId, final WriteTransaction wtx) {
152         final Global global = GlobalUtil.buildGlobal(bgpStateConsumer, this.bgpTableTypeRegistry);
153         final PeerGroups peerGroups = PeerGroupUtil.buildPeerGroups(peerStats);
154         final Neighbors neighbors = NeighborUtil.buildNeighbors(peerStats, this.bgpTableTypeRegistry);
155         InstanceIdentifier<Bgp> bgpIID = this.instanceIdentifiersCache.get(ribId);
156         if (bgpIID == null) {
157             final ProtocolKey protocolKey = new ProtocolKey(BGP.class, bgpStateConsumer.getInstanceIdentifier()
158                     .getKey().getId().getValue());
159             final KeyedInstanceIdentifier<Protocol, ProtocolKey> protocolIId = this.networkInstanceIId
160                     .child(Protocols.class).child(Protocol.class, protocolKey);
161             bgpIID = protocolIId.augmentation(NetworkInstanceProtocol.class).child(Bgp.class);
162             this.instanceIdentifiersCache.put(ribId, bgpIID);
163         }
164
165         final Bgp bgp = new BgpBuilder().setGlobal(global).setNeighbors(neighbors).setPeerGroups(peerGroups).build();
166         wtx.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, bgpIID, bgp);
167     }
168
169     @Override
170     public synchronized void close() {
171         if (closed.compareAndSet(false, true)) {
172             this.scheduleTask.cancel(true);
173             if (!this.instanceIdentifiersCache.keySet().isEmpty()) {
174                 final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
175                 this.instanceIdentifiersCache.keySet().iterator()
176                 .forEachRemaining(ribId -> removeStoredOperationalState(ribId, wTx));
177                 wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
178                     @Override
179                     public void onSuccess(final CommitInfo result) {
180                         LOG.trace("Successfully operational stats removed.");
181                     }
182
183                     @Override
184                     public void onFailure(final Throwable throwable) {
185                         LOG.error("Failed to clean up operational stats", throwable);
186                     }
187                 }, MoreExecutors.directExecutor());
188             }
189             this.transactionChain.close();
190             this.scheduler.shutdown();
191         }
192     }
193
194     @Override
195     public synchronized void onTransactionChainFailed(final TransactionChain chain, final Transaction transaction,
196             final Throwable cause) {
197         LOG.error("Transaction chain {} failed for tx {}",
198                 chain, transaction != null ? transaction.getIdentifier() : null, cause);
199
200         if (!closed.get()) {
201             transactionChain.close();
202             transactionChain = dataBroker.createMergingTransactionChain(this);
203         }
204     }
205
206     @Override
207     public void onTransactionChainSuccessful(final TransactionChain chain) {
208         LOG.debug("Transaction chain {} successful.", chain);
209     }
210 }