Migrate to MD-SAL APIs
[bgpcep.git] / bgp / openconfig-state / src / main / java / org / opendaylight / protocol / bgp / state / StateProviderImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.state;
9
10 import static java.util.Objects.requireNonNull;
11 import static java.util.concurrent.TimeUnit.SECONDS;
12
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import java.util.HashMap;
16 import java.util.HashSet;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Set;
20 import java.util.TimerTask;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.stream.Collectors;
26 import org.checkerframework.checker.lock.qual.GuardedBy;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.opendaylight.mdsal.binding.api.DataBroker;
29 import org.opendaylight.mdsal.binding.api.Transaction;
30 import org.opendaylight.mdsal.binding.api.TransactionChain;
31 import org.opendaylight.mdsal.binding.api.TransactionChainListener;
32 import org.opendaylight.mdsal.binding.api.WriteTransaction;
33 import org.opendaylight.mdsal.common.api.CommitInfo;
34 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
35 import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
36 import org.opendaylight.protocol.bgp.rib.spi.state.BGPPeerState;
37 import org.opendaylight.protocol.bgp.rib.spi.state.BGPRibState;
38 import org.opendaylight.protocol.bgp.rib.spi.state.BGPStateConsumer;
39 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.Bgp;
40 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.BgpBuilder;
41 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Global;
42 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Neighbors;
43 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.PeerGroups;
44 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.NetworkInstances;
45 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.NetworkInstance;
46 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.NetworkInstanceKey;
47 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.network.instance.Protocols;
48 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.network.instance.protocols.Protocol;
49 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.network.instance.rev151018.network.instance.top.network.instances.network.instance.protocols.ProtocolKey;
50 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.policy.types.rev151009.BGP;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.openconfig.extensions.rev180329.NetworkInstanceProtocol;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.Rib;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.RibKey;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 // This class is thread-safe
60 public final class StateProviderImpl implements TransactionChainListener, AutoCloseable {
61     private static final Logger LOG = LoggerFactory.getLogger(StateProviderImpl.class);
62     private final BGPStateConsumer stateCollector;
63     private final BGPTableTypeRegistryConsumer bgpTableTypeRegistry;
64     private final KeyedInstanceIdentifier<NetworkInstance, NetworkInstanceKey> networkInstanceIId;
65     private final int timeout;
66     private final DataBroker dataBroker;
67     @GuardedBy("this")
68     private final Map<String, InstanceIdentifier<Bgp>> instanceIdentifiersCache = new HashMap<>();
69     @GuardedBy("this")
70     private TransactionChain transactionChain;
71     @GuardedBy("this")
72     private ScheduledFuture<?> scheduleTask;
73     private final ScheduledExecutorService scheduler;
74     private final AtomicBoolean closed = new AtomicBoolean(false);
75
76     public StateProviderImpl(final @NonNull DataBroker dataBroker, final int timeout,
77             final @NonNull BGPTableTypeRegistryConsumer bgpTableTypeRegistry,
78             final @NonNull BGPStateConsumer stateCollector, final @NonNull String networkInstanceName) {
79         this(dataBroker, timeout, bgpTableTypeRegistry, stateCollector, networkInstanceName,
80                 Executors.newScheduledThreadPool(1));
81     }
82
83     public StateProviderImpl(final @NonNull DataBroker dataBroker, final int timeout,
84             final @NonNull BGPTableTypeRegistryConsumer bgpTableTypeRegistry,
85             final @NonNull BGPStateConsumer stateCollector,
86             final @NonNull String networkInstanceName, final @NonNull ScheduledExecutorService scheduler) {
87         this.dataBroker = requireNonNull(dataBroker);
88         this.bgpTableTypeRegistry = requireNonNull(bgpTableTypeRegistry);
89         this.stateCollector = requireNonNull(stateCollector);
90         this.networkInstanceIId = InstanceIdentifier.create(NetworkInstances.class)
91                 .child(NetworkInstance.class, new NetworkInstanceKey(networkInstanceName));
92         this.timeout = timeout;
93         this.scheduler = scheduler;
94     }
95
96     public synchronized void init() {
97         this.transactionChain = this.dataBroker.createMergingTransactionChain(this);
98         final TimerTask task = new TimerTask() {
99             @Override
100             @SuppressWarnings("checkstyle:IllegalCatch")
101             public void run() {
102                 synchronized (StateProviderImpl.this) {
103                     final WriteTransaction wTx = StateProviderImpl.this.transactionChain.newWriteOnlyTransaction();
104                     try {
105                         updateBGPStats(wTx);
106
107                         wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
108                             @Override
109                             public void onSuccess(final CommitInfo result) {
110                                 LOG.debug("Successfully committed BGP stats update");
111                             }
112
113                             @Override
114                             public void onFailure(final Throwable ex) {
115                                 LOG.error("Failed to commit BGP stats update", ex);
116                             }
117                         }, MoreExecutors.directExecutor());
118                     } catch (final Exception e) {
119                         LOG.warn("Failed to prepare Tx for BGP stats update", e);
120                         wTx.cancel();
121                     }
122                 }
123             }
124         };
125
126         this.scheduleTask = this.scheduler.scheduleAtFixedRate(task, 0, this.timeout, SECONDS);
127     }
128
129     private synchronized void updateBGPStats(final WriteTransaction wtx) {
130         final Set<String> oldStats = new HashSet<>(this.instanceIdentifiersCache.keySet());
131         this.stateCollector.getRibStats().stream().filter(BGPRibState::isActive).forEach(bgpStateConsumer -> {
132             final KeyedInstanceIdentifier<Rib, RibKey> ribId = bgpStateConsumer.getInstanceIdentifier();
133             final List<BGPPeerState> peerStats = this.stateCollector.getPeerStats().stream()
134                     .filter(BGPPeerState::isActive).filter(peerState -> ribId.equals(peerState.getInstanceIdentifier()))
135                     .collect(Collectors.toList());
136             storeOperationalState(bgpStateConsumer, peerStats, ribId.getKey().getId().getValue(), wtx);
137             oldStats.remove(ribId.getKey().getId().getValue());
138         });
139         oldStats.forEach(ribId -> removeStoredOperationalState(ribId, wtx));
140     }
141
142     private synchronized void removeStoredOperationalState(final String ribId, final WriteTransaction wtx) {
143         final InstanceIdentifier<Bgp> bgpIID = this.instanceIdentifiersCache.remove(ribId);
144         wtx.delete(LogicalDatastoreType.OPERATIONAL, bgpIID);
145     }
146
147     private synchronized void storeOperationalState(final BGPRibState bgpStateConsumer,
148             final List<BGPPeerState> peerStats, final String ribId, final WriteTransaction wtx) {
149         final Global global = GlobalUtil.buildGlobal(bgpStateConsumer, this.bgpTableTypeRegistry);
150         final PeerGroups peerGroups = PeerGroupUtil.buildPeerGroups(peerStats);
151         final Neighbors neighbors = NeighborUtil.buildNeighbors(peerStats, this.bgpTableTypeRegistry);
152         InstanceIdentifier<Bgp> bgpIID = this.instanceIdentifiersCache.get(ribId);
153         if (bgpIID == null) {
154             final ProtocolKey protocolKey = new ProtocolKey(BGP.class, bgpStateConsumer.getInstanceIdentifier()
155                     .getKey().getId().getValue());
156             final KeyedInstanceIdentifier<Protocol, ProtocolKey> protocolIId = this.networkInstanceIId
157                     .child(Protocols.class).child(Protocol.class, protocolKey);
158             bgpIID = protocolIId.augmentation(NetworkInstanceProtocol.class).child(Bgp.class);
159             this.instanceIdentifiersCache.put(ribId, bgpIID);
160         }
161
162         final Bgp bgp = new BgpBuilder().setGlobal(global).setNeighbors(neighbors).setPeerGroups(peerGroups).build();
163         wtx.put(LogicalDatastoreType.OPERATIONAL, bgpIID, bgp, WriteTransaction.CREATE_MISSING_PARENTS);
164     }
165
166     @Override
167     public synchronized void close() {
168         if (closed.compareAndSet(false, true)) {
169             this.scheduleTask.cancel(true);
170             if (!this.instanceIdentifiersCache.keySet().isEmpty()) {
171                 final WriteTransaction wTx = this.transactionChain.newWriteOnlyTransaction();
172                 this.instanceIdentifiersCache.keySet().iterator()
173                 .forEachRemaining(ribId -> removeStoredOperationalState(ribId, wTx));
174                 wTx.commit().addCallback(new FutureCallback<CommitInfo>() {
175                     @Override
176                     public void onSuccess(final CommitInfo result) {
177                         LOG.trace("Successfully operational stats removed.");
178                     }
179
180                     @Override
181                     public void onFailure(final Throwable throwable) {
182                         LOG.error("Failed to clean up operational stats", throwable);
183                     }
184                 }, MoreExecutors.directExecutor());
185             }
186             this.transactionChain.close();
187             this.scheduler.shutdown();
188         }
189     }
190
191     @Override
192     public synchronized void onTransactionChainFailed(final TransactionChain chain, final Transaction transaction,
193             final Throwable cause) {
194         LOG.error("Transaction chain {} failed for tx {}",
195                 chain, transaction != null ? transaction.getIdentifier() : null, cause);
196
197         if (!closed.get()) {
198             transactionChain.close();
199             transactionChain = dataBroker.createMergingTransactionChain(this);
200         }
201     }
202
203     @Override
204     public void onTransactionChainSuccessful(final TransactionChain chain) {
205         LOG.debug("Transaction chain {} successful.", chain);
206     }
207 }