Handle race-conditions in BGP shutdown code
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / config / BGPClusterSingletonService.java
1 /*
2  * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl.config;
9
10 import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUtil.APPLICATION_PEER_GROUP_NAME;
11 import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUtil.APPLICATION_PEER_GROUP_NAME_OPT;
12 import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUtil.getNeighborInstanceIdentifier;
13 import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUtil.getNeighborInstanceName;
14 import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUtil.getRibInstanceName;
15
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import com.google.common.util.concurrent.SettableFuture;
21 import java.util.ArrayList;
22 import java.util.Dictionary;
23 import java.util.HashMap;
24 import java.util.Hashtable;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Optional;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.stream.Collectors;
31 import org.apache.commons.lang3.StringUtils;
32 import org.checkerframework.checker.lock.qual.GuardedBy;
33 import org.checkerframework.checker.lock.qual.Holding;
34 import org.eclipse.jdt.annotation.NonNull;
35 import org.gaul.modernizer_maven_annotations.SuppressModernizer;
36 import org.opendaylight.mdsal.binding.api.DataObjectModification;
37 import org.opendaylight.mdsal.common.api.CommitInfo;
38 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
39 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
40 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
41 import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
42 import org.opendaylight.protocol.bgp.rib.impl.spi.InstanceType;
43 import org.opendaylight.protocol.bgp.rib.spi.util.ClusterSingletonServiceRegistrationHelper;
44 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.neighbor.group.Config;
45 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.neighbors.Neighbor;
46 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.Bgp;
47 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Global;
48 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Neighbors;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.openconfig.extensions.rev180329.NeighborPeerGroupConfig;
50 import org.opendaylight.yangtools.yang.binding.DataObject;
51 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
52 import org.osgi.framework.BundleContext;
53 import org.osgi.framework.ServiceRegistration;
54 import org.osgi.service.blueprint.container.BlueprintContainer;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 public final class BGPClusterSingletonService implements ClusterSingletonService, AutoCloseable {
59
60     private static final Logger LOG = LoggerFactory.getLogger(BGPClusterSingletonService.class);
61
62     private static final long TIMEOUT_NS = TimeUnit.SECONDS.toNanos(5);
63     private final InstanceIdentifier<Bgp> bgpIid;
64     @GuardedBy("this")
65     private final Map<InstanceIdentifier<Neighbor>, PeerBean> peers = new HashMap<>();
66     @GuardedBy("this")
67     private final Map<String, List<PeerBean>> peersGroups = new HashMap<>();
68     private final BGPTableTypeRegistryConsumer tableTypeRegistry;
69     private final BlueprintContainer container;
70     private final BundleContext bundleContext;
71     private final ServiceGroupIdentifier serviceGroupIdentifier;
72     private final AtomicBoolean instantiated = new AtomicBoolean(false);
73     private final PeerGroupConfigLoader peerGroupLoader;
74     @GuardedBy("this")
75     private RibImpl ribImpl;
76
77
78     BGPClusterSingletonService(
79             final @NonNull PeerGroupConfigLoader peerGroupLoader,
80             final @NonNull ClusterSingletonServiceProvider provider,
81             final @NonNull BGPTableTypeRegistryConsumer tableTypeRegistry,
82             final @NonNull BlueprintContainer container,
83             final @NonNull BundleContext bundleContext,
84             final @NonNull InstanceIdentifier<Bgp> bgpIid) {
85         this.peerGroupLoader = peerGroupLoader;
86         this.tableTypeRegistry = tableTypeRegistry;
87         this.container = container;
88         this.bundleContext = bundleContext;
89         this.bgpIid = bgpIid;
90         final String ribInstanceName = getRibInstanceName(bgpIid);
91         this.serviceGroupIdentifier = ServiceGroupIdentifier.create(ribInstanceName + "-service-group");
92         LOG.info("BGPClusterSingletonService {} registered", this.serviceGroupIdentifier.getName());
93         ClusterSingletonServiceRegistrationHelper
94                 .registerSingletonService(provider, this);
95     }
96
97     @Override
98     public synchronized void instantiateServiceInstance() {
99         if (this.ribImpl != null) {
100             this.ribImpl.instantiateServiceInstance();
101             this.peers.values().forEach(PeerBean::instantiateServiceInstance);
102         }
103         this.instantiated.set(true);
104         LOG.info("BGPClusterSingletonService {} instantiated", this.serviceGroupIdentifier.getName());
105     }
106
107     @Override
108     public synchronized ListenableFuture<? extends CommitInfo> closeServiceInstance() {
109         LOG.info("BGPClusterSingletonService {} close service instance", this.serviceGroupIdentifier.getName());
110         this.instantiated.set(false);
111
112         final List<ListenableFuture<? extends CommitInfo>> futurePeerCloseList = this.peers.values().stream()
113                 .map(PeerBean::closeServiceInstance).collect(Collectors.toList());
114         final SettableFuture<? extends CommitInfo> done = SettableFuture.create();
115
116         final ListenableFuture<List<CommitInfo>> futureResult = Futures.allAsList(futurePeerCloseList);
117         Futures.addCallback(futureResult, new FutureCallback<List<? extends CommitInfo>>() {
118             @Override
119             public void onSuccess(final List<? extends CommitInfo> result) {
120                 synchronized (BGPClusterSingletonService.this) {
121                     if (BGPClusterSingletonService.this.ribImpl != null) {
122                         done.setFuture(Futures.transform(BGPClusterSingletonService.this.ribImpl.closeServiceInstance(),
123                             input -> null, MoreExecutors.directExecutor()));
124                     } else {
125                         done.setFuture(Futures.transform(CommitInfo.emptyFluentFuture(),
126                             input -> null, MoreExecutors.directExecutor()));
127                     }
128                 }
129             }
130
131             @Override
132             public void onFailure(final Throwable throwable) {
133                 LOG.warn("Failed removing peers", throwable);
134             }
135         }, MoreExecutors.directExecutor());
136         return done;
137     }
138
139     @Override
140     public ServiceGroupIdentifier getIdentifier() {
141         return this.serviceGroupIdentifier;
142     }
143
144     synchronized void onGlobalChanged(final DataObjectModification<Global> dataObjectModification) {
145         switch (dataObjectModification.getModificationType()) {
146             case DELETE:
147                 LOG.debug("Removing RIB instance: {}", this.bgpIid);
148                 if (this.ribImpl != null) {
149                     LOG.debug("RIB instance removed {}", this.ribImpl);
150                     closeAllBindedPeers();
151                     closeRibService();
152                     this.ribImpl = null;
153                 }
154                 break;
155             case SUBTREE_MODIFIED:
156             case WRITE:
157                 final Global global = dataObjectModification.getDataAfter();
158                 if (this.ribImpl == null) {
159                     onGlobalCreated(global);
160                 } else if (!this.ribImpl.isGlobalEqual(global)) {
161                     onGlobalUpdated(global);
162                 }
163                 break;
164             default:
165                 break;
166         }
167     }
168
169     private synchronized void onGlobalCreated(final Global global) {
170         LOG.debug("Creating RIB instance with configuration: {}", global);
171         this.ribImpl = (RibImpl) this.container.getComponentInstance(InstanceType.RIB.getBeanName());
172         initiateRibInstance(global);
173         LOG.debug("RIB instance created: {}", this.ribImpl);
174     }
175
176     private synchronized void onGlobalUpdated(final Global global) {
177         LOG.debug("Modifying RIB instance with configuration: {}", global);
178         final List<PeerBean> closedPeers = closeAllBindedPeers();
179         closeRibService();
180         initiateRibInstance(global);
181         for (final PeerBean peer : closedPeers) {
182             peer.restart(this.ribImpl, this.bgpIid, this.peerGroupLoader, this.tableTypeRegistry);
183         }
184         if (this.instantiated.get()) {
185             closedPeers.forEach(PeerBean::instantiateServiceInstance);
186         }
187         LOG.debug("RIB instance created: {}", this.ribImpl);
188     }
189
190     @Holding("this")
191     @SuppressWarnings("checkstyle:illegalCatch")
192     private void closeRibService() {
193         try {
194             this.ribImpl.closeServiceInstance().get(TIMEOUT_NS, TimeUnit.NANOSECONDS);
195         } catch (final Exception e) {
196             LOG.error("RIB instance failed to close service instance", e);
197         }
198         this.ribImpl.close();
199     }
200
201     @Holding("this")
202     private void initiateRibInstance(final Global global) {
203         final String ribInstanceName = getRibInstanceName(this.bgpIid);
204         ribImpl.start(global, ribInstanceName, this.tableTypeRegistry);
205         registerRibInstance(ribImpl, ribInstanceName);
206         if (this.instantiated.get()) {
207             this.ribImpl.instantiateServiceInstance();
208         }
209     }
210
211     @SuppressWarnings("checkstyle:illegalCatch")
212     private synchronized List<PeerBean> closeAllBindedPeers() {
213         final List<PeerBean> filtered = new ArrayList<>();
214         this.peers.forEach((key, peer) -> {
215             try {
216                 peer.closeServiceInstance().get();
217             } catch (final Exception e) {
218                 LOG.error("Peer instance failed to close service instance", e);
219             }
220             peer.close();
221             filtered.add(peer);
222         });
223         return filtered;
224     }
225
226     private synchronized void registerRibInstance(final RibImpl rib, final String ribInstanceName) {
227         final ServiceRegistration<?> serviceRegistration = this.bundleContext.registerService(
228                 InstanceType.RIB.getServices(), rib, dictionaryOf(InstanceType.RIB.getBeanName(), ribInstanceName));
229         rib.setServiceRegistration(serviceRegistration);
230     }
231
232     @Override
233     public synchronized void close() {
234         LOG.info("BGPClusterSingletonService {} close", this.serviceGroupIdentifier.getName());
235         this.peers.values().iterator().forEachRemaining(PeerBean::close);
236         this.ribImpl.close();
237         this.peers.clear();
238         this.ribImpl = null;
239     }
240
241     synchronized void onNeighborsChanged(final DataObjectModification<Neighbors> dataObjectModification) {
242         for (final DataObjectModification<? extends DataObject> neighborModification :
243                 dataObjectModification.getModifiedChildren()) {
244             switch (neighborModification.getModificationType()) {
245                 case DELETE:
246                     onNeighborRemoved((Neighbor) neighborModification.getDataBefore());
247                     break;
248                 case SUBTREE_MODIFIED:
249                 case WRITE:
250                     onNeighborModified((Neighbor) neighborModification.getDataAfter());
251                     break;
252                 default:
253                     break;
254             }
255         }
256     }
257
258     private synchronized void onNeighborModified(final Neighbor neighbor) {
259         //restart peer instance with a new configuration
260         final PeerBean bgpPeer = this.peers.get(getNeighborInstanceIdentifier(this.bgpIid, neighbor.key()));
261         if (bgpPeer == null) {
262             onNeighborCreated(neighbor);
263         } else if (!bgpPeer.containsEqualConfiguration(neighbor)) {
264             onNeighborUpdated(bgpPeer, neighbor);
265         }
266     }
267
268     private synchronized void onNeighborCreated(final Neighbor neighbor) {
269         LOG.debug("Creating Peer instance with configuration: {}", neighbor);
270         final PeerBean bgpPeer;
271         if (OpenConfigMappingUtil.isApplicationPeer(neighbor)) {
272             bgpPeer = (PeerBean) this.container.getComponentInstance(InstanceType.APP_PEER.getBeanName());
273         } else {
274             bgpPeer = (PeerBean) this.container.getComponentInstance(InstanceType.PEER.getBeanName());
275         }
276         final InstanceIdentifier<Neighbor> neighborInstanceIdentifier =
277                 getNeighborInstanceIdentifier(this.bgpIid, neighbor.key());
278         initiatePeerInstance(neighborInstanceIdentifier, neighbor, bgpPeer);
279         this.peers.put(neighborInstanceIdentifier, bgpPeer);
280
281         final Optional<String> peerGroupName = getPeerGroupName(neighbor.getConfig());
282         peerGroupName.ifPresent(s -> this.peersGroups.computeIfAbsent(s, k -> new ArrayList<>()).add(bgpPeer));
283         LOG.debug("Peer instance created {}", neighbor.key().getNeighborAddress());
284     }
285
286     private static Optional<String> getPeerGroupName(final Config config) {
287         if (config == null) {
288             return Optional.empty();
289         }
290         final NeighborPeerGroupConfig aug = config.augmentation(NeighborPeerGroupConfig.class);
291         if (aug == null || aug.getPeerGroup() == null) {
292             return Optional.empty();
293         }
294         final String peerGroupName = aug.getPeerGroup();
295         if (peerGroupName.equals(APPLICATION_PEER_GROUP_NAME)) {
296             return APPLICATION_PEER_GROUP_NAME_OPT;
297         }
298         return Optional.of(StringUtils.substringBetween(peerGroupName, "=\"", "\""));
299     }
300
301     private synchronized void onNeighborUpdated(final PeerBean bgpPeer, final Neighbor neighbor) {
302         LOG.debug("Updating Peer instance with configuration: {}", neighbor);
303         closePeer(bgpPeer);
304
305         final InstanceIdentifier<Neighbor> neighborInstanceIdentifier
306                 = getNeighborInstanceIdentifier(this.bgpIid, neighbor.key());
307         initiatePeerInstance(neighborInstanceIdentifier, neighbor, bgpPeer);
308         LOG.debug("Peer instance updated {}", bgpPeer);
309     }
310
311     @SuppressWarnings("checkstyle:illegalCatch")
312     private static void closePeer(final PeerBean bgpPeer) {
313         if (bgpPeer != null) {
314             try {
315                 bgpPeer.closeServiceInstance().get();
316                 bgpPeer.close();
317                 LOG.debug("Peer instance closed {}", bgpPeer);
318             } catch (final Exception e) {
319                 LOG.error("Peer instance failed to close service instance", e);
320             }
321         }
322     }
323
324     private synchronized void onNeighborRemoved(final Neighbor neighbor) {
325         LOG.debug("Removing Peer instance: {}", neighbor);
326         final PeerBean bgpPeer = this.peers.remove(getNeighborInstanceIdentifier(this.bgpIid, neighbor.key()));
327
328         final Optional<String> groupName = getPeerGroupName(neighbor.getConfig());
329         groupName.ifPresent(s -> this.peersGroups.computeIfPresent(s, (k, groupPeers) -> {
330             groupPeers.remove(bgpPeer);
331             return groupPeers.isEmpty() ? null : groupPeers;
332         }));
333         closePeer(bgpPeer);
334     }
335
336     private synchronized void registerPeerInstance(final BgpPeer bgpPeer, final String peerInstanceName) {
337         final ServiceRegistration<?> serviceRegistration = this.bundleContext.registerService(
338             InstanceType.PEER.getServices(), bgpPeer, dictionaryOf(InstanceType.PEER.getBeanName(), peerInstanceName));
339         bgpPeer.setServiceRegistration(serviceRegistration);
340     }
341
342     @SuppressModernizer
343     private static Dictionary<String, String> dictionaryOf(final String key, final String value) {
344         final Dictionary<String, String> properties = new Hashtable<>();
345         properties.put(key, value);
346         return properties;
347     }
348
349     private synchronized void registerAppPeerInstance(final AppPeer appPeer, final String peerInstanceName) {
350         final ServiceRegistration<?> serviceRegistration = this.bundleContext.registerService(
351             InstanceType.APP_PEER.getServices(), appPeer,
352             dictionaryOf(InstanceType.PEER.getBeanName(), peerInstanceName));
353         appPeer.setServiceRegistration(serviceRegistration);
354     }
355
356     private synchronized void initiatePeerInstance(final InstanceIdentifier<Neighbor> neighborIdentifier,
357             final Neighbor neighbor, final PeerBean bgpPeer) {
358         final String peerInstanceName = getNeighborInstanceName(neighborIdentifier);
359         if (this.ribImpl != null) {
360             bgpPeer.start(this.ribImpl, neighbor, this.bgpIid, this.peerGroupLoader, this.tableTypeRegistry);
361             if (bgpPeer instanceof BgpPeer) {
362                 registerPeerInstance((BgpPeer) bgpPeer, peerInstanceName);
363             } else if (bgpPeer instanceof AppPeer) {
364                 registerAppPeerInstance((AppPeer) bgpPeer, peerInstanceName);
365             }
366         }
367         if (this.instantiated.get()) {
368             bgpPeer.instantiateServiceInstance();
369         }
370     }
371
372     @SuppressWarnings("checkstyle:illegalCatch")
373     synchronized void restartNeighbors(final String peerGroupName) {
374         final List<PeerBean> peerGroup = this.peersGroups.get(peerGroupName);
375         if (peerGroup == null) {
376             return;
377         }
378         for (final PeerBean peer : peerGroup) {
379             try {
380                 peer.closeServiceInstance().get();
381             } catch (final Exception e) {
382                 LOG.error("Peer instance failed to close service instance", e);
383             }
384             peer.restart(this.ribImpl, this.bgpIid, this.peerGroupLoader, this.tableTypeRegistry);
385             peer.instantiateServiceInstance();
386         }
387     }
388 }