Migrate deprecated submit() to commit() for BGP/BMP
[bgpcep.git] / bgp / openconfig-state / src / main / java / org / opendaylight / protocol / bgp / state / StateProviderImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.protocol.bgp.state;
10
11 import static java.util.Objects.requireNonNull;
12 import static java.util.concurrent.TimeUnit.SECONDS;
13
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Set;
21 import java.util.TimerTask;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.stream.Collectors;
27 import javax.annotation.Nonnull;
28 import javax.annotation.concurrent.GuardedBy;
29 import javax.annotation.concurrent.ThreadSafe;
30 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
34 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
36 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
37 import org.opendaylight.mdsal.common.api.CommitInfo;
38 import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
39 import org.opendaylight.protocol.bgp.rib.spi.state.BGPPeerState;
40 import org.opendaylight.protocol.bgp.rib.spi.state.BGPRibState;
41 import org.opendaylight.protocol.bgp.rib.spi.state.BGPStateConsumer;
42 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.Bgp;
43 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.BgpBuilder;
44 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Global;
45 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Neighbors;
46 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.PeerGroups;
47 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.NetworkInstances;
48 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.NetworkInstance;
49 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.NetworkInstanceKey;
50 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.network.instance.Protocols;
51 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.network.instance.protocols.Protocol;
52 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.network.instance.protocols.ProtocolKey;
53 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.policy.types.rev151009.BGP;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.openconfig.extensions.rev180329.NetworkInstanceProtocol;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.Rib;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.RibKey;
57 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
58 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 @ThreadSafe
63 public final class StateProviderImpl implements TransactionChainListener, AutoCloseable {
64     private static final Logger LOG = LoggerFactory.getLogger(StateProviderImpl.class);
65     private final BGPStateConsumer stateCollector;
66     private final BGPTableTypeRegistryConsumer bgpTableTypeRegistry;
67     private final KeyedInstanceIdentifier<NetworkInstance, NetworkInstanceKey> networkInstanceIId;
68     private final int timeout;
69     private final DataBroker dataBroker;
70     @GuardedBy("this")
71     private final Map<String, InstanceIdentifier<Bgp>> instanceIdentifiersCache = new HashMap<>();
72     @GuardedBy("this")
73     private BindingTransactionChain transactionChain;
74     @GuardedBy("this")
75     private ScheduledFuture<?> scheduleTask;
76     private final ScheduledExecutorService scheduler;
77     private final AtomicBoolean closed = new AtomicBoolean(false);
78
79     public StateProviderImpl(@Nonnull final DataBroker dataBroker, final int timeout,
80             @Nonnull final BGPTableTypeRegistryConsumer bgpTableTypeRegistry,
81             @Nonnull final BGPStateConsumer stateCollector,
82             @Nonnull final String networkInstanceName) {
83         this(dataBroker, timeout, bgpTableTypeRegistry, stateCollector, networkInstanceName,
84                 Executors.newScheduledThreadPool(1));
85     }
86
87     public StateProviderImpl(@Nonnull final DataBroker dataBroker, final int timeout,
88             @Nonnull final BGPTableTypeRegistryConsumer bgpTableTypeRegistry,
89             @Nonnull final BGPStateConsumer stateCollector,
90             @Nonnull final String networkInstanceName,
91             @Nonnull final ScheduledExecutorService scheduler) {
92         this.dataBroker = requireNonNull(dataBroker);
93         this.bgpTableTypeRegistry = requireNonNull(bgpTableTypeRegistry);
94         this.stateCollector = requireNonNull(stateCollector);
95         this.networkInstanceIId = InstanceIdentifier.create(NetworkInstances.class)
96                 .child(NetworkInstance.class, new NetworkInstanceKey(networkInstanceName));
97         this.timeout = timeout;
98         this.scheduler = scheduler;
99     }
100
101     public synchronized void init() {
102         this.transactionChain = this.dataBroker.createTransactionChain(this);
103         final TimerTask task = new TimerTask() {
104             @Override
105             @SuppressWarnings("checkstyle:IllegalCatch")
106             public void run() {
107                 synchronized (StateProviderImpl.this) {
108                     final WriteTransaction wTx = StateProviderImpl.this.transactionChain.newWriteOnlyTransaction();
109                     try {
110                         updateBGPStats(wTx);
111
112                         wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
113                             @Override
114                             public void onSuccess(CommitInfo result) {
115                                 LOG.debug("Successfully committed BGP stats update");
116                             }
117
118                             @Override
119                             public void onFailure(Throwable ex) {
120                                 LOG.error("Failed to commit BGP stats update", ex);
121                             }
122                         }, MoreExecutors.directExecutor());
123                     } catch (final Exception e) {
124                         LOG.warn("Failed to prepare Tx for BGP stats update", e);
125                         wTx.cancel();
126                     }
127                 }
128             }
129         };
130
131         this.scheduleTask = this.scheduler.scheduleAtFixedRate(task, 0, this.timeout, SECONDS);
132     }
133
134     private synchronized void updateBGPStats(final WriteTransaction wtx) {
135         final Set<String> oldStats = new HashSet<>(this.instanceIdentifiersCache.keySet());
136         this.stateCollector.getRibStats().stream().filter(BGPRibState::isActive).forEach(bgpStateConsumer -> {
137             final KeyedInstanceIdentifier<Rib, RibKey> ribId = bgpStateConsumer.getInstanceIdentifier();
138             final List<BGPPeerState> peerStats = this.stateCollector.getPeerStats().stream()
139                     .filter(BGPPeerState::isActive).filter(peerState -> ribId.equals(peerState.getInstanceIdentifier()))
140                     .collect(Collectors.toList());
141             storeOperationalState(bgpStateConsumer, peerStats, ribId.getKey().getId().getValue(), wtx);
142             oldStats.remove(ribId.getKey().getId().getValue());
143         });
144         oldStats.forEach(ribId -> removeStoredOperationalState(ribId, wtx));
145     }
146
147     private synchronized void removeStoredOperationalState(final String ribId, final WriteTransaction wtx) {
148         final InstanceIdentifier<Bgp> bgpIID = this.instanceIdentifiersCache.remove(ribId);
149         wtx.delete(LogicalDatastoreType.OPERATIONAL, bgpIID);
150     }
151
152     private synchronized void storeOperationalState(final BGPRibState bgpStateConsumer,
153             final List<BGPPeerState> peerStats, final String ribId, final WriteTransaction wtx) {
154         final Global global = GlobalUtil.buildGlobal(bgpStateConsumer, this.bgpTableTypeRegistry);
155         final PeerGroups peerGroups = PeerGroupUtil.buildPeerGroups(peerStats);
156         final Neighbors neighbors = NeighborUtil.buildNeighbors(peerStats, this.bgpTableTypeRegistry);
157         InstanceIdentifier<Bgp> bgpIID = this.instanceIdentifiersCache.get(ribId);
158         if (bgpIID == null) {
159             final ProtocolKey protocolKey = new ProtocolKey(BGP.class, bgpStateConsumer.getInstanceIdentifier()
160                     .getKey().getId().getValue());
161             final KeyedInstanceIdentifier<Protocol, ProtocolKey> protocolIId = this.networkInstanceIId
162                     .child(Protocols.class).child(Protocol.class, protocolKey);
163             bgpIID = protocolIId.augmentation(NetworkInstanceProtocol.class).child(Bgp.class);
164             this.instanceIdentifiersCache.put(ribId, bgpIID);
165         }
166
167         final Bgp bgp = new BgpBuilder().setGlobal(global).setNeighbors(neighbors).setPeerGroups(peerGroups).build();
168         wtx.put(LogicalDatastoreType.OPERATIONAL, bgpIID, bgp, WriteTransaction.CREATE_MISSING_PARENTS);
169     }
170
171     @Override
172     public synchronized void close() {
173         if (closed.compareAndSet(false, true)) {
174             this.scheduleTask.cancel(true);
175             if (!this.instanceIdentifiersCache.keySet().isEmpty()) {
176                 final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
177                 this.instanceIdentifiersCache.keySet().iterator()
178                 .forEachRemaining(ribId -> removeStoredOperationalState(ribId, wTx));
179                 wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
180                     @Override
181                     public void onSuccess(final CommitInfo result) {
182                         LOG.trace("Successfully operational stats removed.");
183                     }
184
185                     @Override
186                     public void onFailure(final Throwable throwable) {
187                         LOG.error("Failed to clean up operational stats", throwable);
188                     }
189                 }, MoreExecutors.directExecutor());
190             }
191             this.transactionChain.close();
192             this.scheduler.shutdown();
193         }
194     }
195
196     @Override
197     public synchronized void onTransactionChainFailed(final TransactionChain<?, ?> chain,
198             final AsyncTransaction<?, ?> transaction, final Throwable cause) {
199         LOG.error("Transaction chain {} failed for tx {}",
200                 chain, transaction != null ? transaction.getIdentifier() : null, cause);
201
202         if (!closed.get()) {
203             transactionChain.close();
204             transactionChain = dataBroker.createTransactionChain(this);
205         }
206     }
207
208     @Override
209     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
210         LOG.debug("Transaction chain {} successful.", chain);
211     }
212 }