a978c3839a088879b41cb5488685b81e96a55683
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / config / BGPClusterSingletonService.java
1 /*
2  * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl.config;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUtil.APPLICATION_PEER_GROUP_NAME;
12 import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUtil.APPLICATION_PEER_GROUP_NAME_OPT;
13 import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUtil.getNeighborInstanceIdentifier;
14 import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUtil.getRibInstanceName;
15
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import com.google.common.util.concurrent.SettableFuture;
22 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Optional;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.stream.Collectors;
32 import org.apache.commons.lang3.StringUtils;
33 import org.checkerframework.checker.lock.qual.GuardedBy;
34 import org.checkerframework.checker.lock.qual.Holding;
35 import org.eclipse.jdt.annotation.NonNull;
36 import org.opendaylight.mdsal.binding.api.DataObjectModification;
37 import org.opendaylight.mdsal.binding.api.RpcProviderService;
38 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
39 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
42 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
43 import org.opendaylight.protocol.bgp.openconfig.routing.policy.spi.BGPRibRoutingPolicyFactory;
44 import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
45 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
46 import org.opendaylight.protocol.bgp.rib.impl.spi.CodecsRegistry;
47 import org.opendaylight.protocol.bgp.rib.spi.RIBExtensionConsumerContext;
48 import org.opendaylight.protocol.bgp.rib.spi.state.BGPStateProviderRegistry;
49 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.neighbor.group.Config;
50 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.neighbors.Neighbor;
51 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.Bgp;
52 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Global;
53 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Neighbors;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.openconfig.extensions.rev180329.network.instance.protocol.NeighborPeerGroupConfig;
55 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
56 import org.opendaylight.yangtools.yang.common.Empty;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 @VisibleForTesting
61 @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Non-final for Mockito.spy()")
62 public class BGPClusterSingletonService implements ClusterSingletonService, AutoCloseable {
63     private static final Logger LOG = LoggerFactory.getLogger(BGPClusterSingletonService.class);
64
65     private final InstanceIdentifier<Bgp> bgpIid;
66     private final BGPTableTypeRegistryConsumer tableTypeRegistry;
67     private final @NonNull ServiceGroupIdentifier serviceGroupIdentifier;
68     private final AtomicBoolean instantiated = new AtomicBoolean(false);
69     private final PeerGroupConfigLoader peerGroupLoader;
70     private final RpcProviderService rpcRegistry;
71     private final RIBExtensionConsumerContext ribExtensionContext;
72     private final BGPDispatcher bgpDispatcher;
73     private final BGPRibRoutingPolicyFactory routingPolicyFactory;
74     private final CodecsRegistry codecsRegistry;
75     private final BGPStateProviderRegistry stateProviderRegistry;
76     private final DOMDataBroker domDataBroker;
77
78     @GuardedBy("this")
79     private final Map<InstanceIdentifier<Neighbor>, PeerBean> peers = new HashMap<>();
80     @GuardedBy("this")
81     private final Map<String, List<PeerBean>> peersGroups = new HashMap<>();
82     @GuardedBy("this")
83     private RibImpl ribImpl;
84     @GuardedBy("this")
85     private ClusterSingletonServiceRegistration cssRegistration;
86
87     BGPClusterSingletonService(
88             final @NonNull PeerGroupConfigLoader peerGroupLoader,
89             final @NonNull ClusterSingletonServiceProvider provider,
90             final @NonNull BGPTableTypeRegistryConsumer tableTypeRegistry,
91             final @NonNull RpcProviderService rpcRegistry,
92             final @NonNull RIBExtensionConsumerContext ribExtensionContext,
93             final @NonNull BGPDispatcher bgpDispatcher,
94             final @NonNull BGPRibRoutingPolicyFactory routingPolicyFactory,
95             final @NonNull CodecsRegistry codecsRegistry,
96             final @NonNull BGPStateProviderRegistry stateProviderRegistry,
97             final @NonNull DOMDataBroker domDataBroker,
98             final @NonNull InstanceIdentifier<Bgp> bgpIid) {
99         this.peerGroupLoader = peerGroupLoader;
100         this.tableTypeRegistry = tableTypeRegistry;
101         this.rpcRegistry = rpcRegistry;
102         this.ribExtensionContext = ribExtensionContext;
103         this.bgpDispatcher = bgpDispatcher;
104         this.routingPolicyFactory = routingPolicyFactory;
105         this.codecsRegistry = codecsRegistry;
106         this.stateProviderRegistry = stateProviderRegistry;
107         this.domDataBroker = domDataBroker;
108         this.bgpIid = bgpIid;
109         serviceGroupIdentifier = ServiceGroupIdentifier.create(getRibInstanceName(bgpIid) + "-service-group");
110         cssRegistration = provider.registerClusterSingletonService(this);
111         LOG.info("BGPClusterSingletonService {} registered", serviceGroupIdentifier.getName());
112     }
113
114     @Override
115     public ServiceGroupIdentifier getIdentifier() {
116         return serviceGroupIdentifier;
117     }
118
119     @Override
120     public synchronized void instantiateServiceInstance() {
121         if (ribImpl != null) {
122             ribImpl.instantiateServiceInstance();
123             peers.values().forEach(PeerBean::instantiateServiceInstance);
124         }
125         instantiated.set(true);
126         LOG.info("BGPClusterSingletonService {} instantiated", serviceGroupIdentifier.getName());
127     }
128
129     @Override
130     public synchronized ListenableFuture<?> closeServiceInstance() {
131         LOG.info("BGPClusterSingletonService {} close service instance", serviceGroupIdentifier.getName());
132         instantiated.set(false);
133
134         final List<ListenableFuture<?>> futurePeerCloseList = peers.values().stream()
135                 .map(PeerBean::closeServiceInstance).collect(Collectors.toList());
136         final SettableFuture<Empty> done = SettableFuture.create();
137
138         final ListenableFuture<List<Object>> futureResult = Futures.allAsList(futurePeerCloseList);
139         Futures.addCallback(futureResult, new FutureCallback<List<?>>() {
140             @Override
141             public void onSuccess(final List<?> result) {
142                 synchronized (BGPClusterSingletonService.this) {
143                     if (ribImpl != null) {
144                         done.setFuture(Futures.transform(ribImpl.closeServiceInstance(),
145                                 input -> Empty.value(), MoreExecutors.directExecutor()));
146                     } else {
147                         done.set(Empty.value());
148                     }
149                 }
150             }
151
152             @Override
153             public void onFailure(final Throwable throwable) {
154                 LOG.warn("Failed removing peers", throwable);
155             }
156         }, MoreExecutors.directExecutor());
157         return done;
158     }
159
160     synchronized void onGlobalChanged(final DataObjectModification<Global> dataObjectModification) {
161         switch (dataObjectModification.getModificationType()) {
162             case DELETE:
163                 LOG.debug("Removing RIB instance: {}", bgpIid);
164                 if (ribImpl != null) {
165                     LOG.debug("RIB instance removed {}", ribImpl);
166                     closeBoundPeers();
167                     closeRibInstance();
168                     ribImpl = null;
169                 }
170                 break;
171             case SUBTREE_MODIFIED:
172             case WRITE:
173                 final Global global = dataObjectModification.getDataAfter();
174                 if (ribImpl == null) {
175                     onGlobalCreated(global);
176                 } else if (!ribImpl.isGlobalEqual(requireNonNull(global))) {
177                     onGlobalUpdated(global);
178                 }
179                 break;
180             default:
181                 break;
182         }
183     }
184
185     @Holding("this")
186     private void onGlobalCreated(final Global global) {
187         LOG.debug("Creating RIB instance with configuration: {}", global);
188         ribImpl = new RibImpl(ribExtensionContext, bgpDispatcher, routingPolicyFactory, codecsRegistry,
189             stateProviderRegistry, domDataBroker);
190         initiateRibInstance(global);
191         LOG.debug("RIB instance created: {}", ribImpl);
192     }
193
194     @Holding("this")
195     private void onGlobalUpdated(final Global global) {
196         LOG.info("Global config {} updated, new configuration {}", global.getConfig().getRouterId(), global);
197         closeRibInstance();
198         initiateRibInstance(global);
199         restartPeers(peers.values());
200     }
201
202     @VisibleForTesting
203     @Holding("this")
204     void closeRibInstance() {
205         try {
206             ribImpl.stop().get();
207         } catch (InterruptedException e) {
208             LOG.error("Interrupted while waiting for RIB instance {} to close", ribImpl.getBgpIdentifier(), e);
209         } catch (ExecutionException e) {
210             LOG.error("RIB instance {} failed to close", ribImpl.getBgpIdentifier(), e);
211         }
212     }
213
214     @VisibleForTesting
215     @Holding("this")
216     void initiateRibInstance(final Global global) {
217         final String ribInstanceName = getRibInstanceName(bgpIid);
218         ribImpl.start(global, ribInstanceName, tableTypeRegistry);
219         if (instantiated.get()) {
220             ribImpl.instantiateServiceInstance();
221         }
222     }
223
224     @Holding("this")
225     private List<PeerBean> closeBoundPeers() {
226         final List<PeerBean> filtered = new ArrayList<>(peers.size());
227         peers.forEach((key, peer) -> {
228             if (closePeer(peer)) {
229                 filtered.add(peer);
230             }
231         });
232         return filtered;
233     }
234
235     @Override
236     public synchronized void close() {
237         if (cssRegistration == null) {
238             // Idempotent as per AutoCloseable contract
239             return;
240         }
241
242         LOG.info("Closing BGPClusterSingletonService {}", serviceGroupIdentifier.getName());
243         cssRegistration.close();
244         cssRegistration = null;
245
246         closeBoundPeers();
247         peers.clear();
248         closeRibInstance();
249         ribImpl = null;
250     }
251
252     @VisibleForTesting
253     @Holding("this")
254     void onNeighborsChanged(final DataObjectModification<Neighbors> dataObjectModification) {
255         for (final DataObjectModification<?> neighborModification : dataObjectModification.getModifiedChildren()) {
256             switch (neighborModification.getModificationType()) {
257                 case DELETE:
258                     onNeighborRemoved((Neighbor) neighborModification.getDataBefore());
259                     break;
260                 case SUBTREE_MODIFIED:
261                 case WRITE:
262                     onNeighborModified((Neighbor) neighborModification.getDataAfter());
263                     break;
264                 default:
265                     break;
266             }
267         }
268     }
269
270     @Holding("this")
271     private void onNeighborModified(final Neighbor neighbor) {
272         //restart peer instance with a new configuration
273         final PeerBean bgpPeer = peers.get(getNeighborInstanceIdentifier(bgpIid, neighbor.key()));
274         if (bgpPeer == null) {
275             onNeighborCreated(neighbor);
276         } else if (!bgpPeer.containsEqualConfiguration(neighbor)) {
277             onNeighborUpdated(bgpPeer, neighbor);
278         }
279     }
280
281     @VisibleForTesting
282     @Holding("this")
283     void onNeighborCreated(final Neighbor neighbor) {
284         LOG.info("Creating Peer instance {} with configuration: {}", neighbor.getNeighborAddress(), neighbor);
285         final PeerBean bgpPeer;
286         if (OpenConfigMappingUtil.isApplicationPeer(neighbor)) {
287             bgpPeer = new AppPeer(stateProviderRegistry);
288         } else {
289             bgpPeer = new BgpPeer(rpcRegistry, stateProviderRegistry);
290         }
291         final InstanceIdentifier<Neighbor> neighborInstanceIdentifier =
292                 getNeighborInstanceIdentifier(bgpIid, neighbor.key());
293         initiatePeerInstance(neighbor, bgpPeer);
294         peers.put(neighborInstanceIdentifier, bgpPeer);
295
296         final Optional<String> peerGroupName = getPeerGroupName(neighbor.getConfig());
297         peerGroupName.ifPresent(s -> peersGroups.computeIfAbsent(s, k -> new ArrayList<>()).add(bgpPeer));
298         LOG.info("Peer instance created {}", neighbor.getNeighborAddress());
299     }
300
301     @VisibleForTesting
302     @Holding("this")
303     void onNeighborUpdated(final PeerBean bgpPeer, final Neighbor neighbor) {
304         LOG.info("Updating Peer {} with new configuration: {}", neighbor.getNeighborAddress(), neighbor);
305         closePeer(bgpPeer);
306         initiatePeerInstance(neighbor, bgpPeer);
307     }
308
309     private static Optional<String> getPeerGroupName(final Config config) {
310         if (config == null) {
311             return Optional.empty();
312         }
313         final NeighborPeerGroupConfig aug = config.augmentation(NeighborPeerGroupConfig.class);
314         if (aug == null || aug.getPeerGroup() == null) {
315             return Optional.empty();
316         }
317         final String peerGroupName = aug.getPeerGroup();
318         if (peerGroupName.equals(APPLICATION_PEER_GROUP_NAME)) {
319             return APPLICATION_PEER_GROUP_NAME_OPT;
320         }
321         return Optional.of(StringUtils.substringBetween(peerGroupName, "=\"", "\""));
322     }
323
324     private static boolean closePeer(final PeerBean bgpPeer) {
325         if (bgpPeer == null) {
326             return false;
327         }
328
329         try {
330             bgpPeer.stop().get();
331         } catch (InterruptedException e) {
332             LOG.error("Interrupted while waiting for peer instance failed to close service", e);
333             return false;
334         } catch (ExecutionException e) {
335             LOG.error("Peer instance failed to close service instance", e);
336             return false;
337         }
338
339         LOG.info("Peer instance {} closed", bgpPeer.getCurrentConfiguration().getNeighborAddress());
340         return true;
341     }
342
343     @VisibleForTesting
344     @Holding("this")
345     public void onNeighborRemoved(final Neighbor neighbor) {
346         LOG.info("Removing Peer instance: {}", neighbor.getNeighborAddress());
347         final PeerBean bgpPeer = peers.remove(getNeighborInstanceIdentifier(bgpIid, neighbor.key()));
348
349         final Optional<String> groupName = getPeerGroupName(neighbor.getConfig());
350         groupName.ifPresent(s -> peersGroups.computeIfPresent(s, (k, groupPeers) -> {
351             groupPeers.remove(bgpPeer);
352             return groupPeers.isEmpty() ? null : groupPeers;
353         }));
354         closePeer(bgpPeer);
355     }
356
357     @VisibleForTesting
358     @Holding("this")
359     // FIXME: synchronized because SpotBugs does not understand @Holding with @VisibleForTesting (which we need for
360     //        Mockito.verify())
361     synchronized void initiatePeerInstance(final Neighbor neighbor, final PeerBean bgpPeer) {
362         if (ribImpl != null) {
363             bgpPeer.start(ribImpl, neighbor, bgpIid, peerGroupLoader, tableTypeRegistry);
364         }
365         if (instantiated.get()) {
366             bgpPeer.instantiateServiceInstance();
367         }
368     }
369
370     @Holding("this")
371     private void restartPeers(final Collection<PeerBean> toRestart) {
372         toRestart.stream().filter(BGPClusterSingletonService::closePeer)
373             .forEach(peer -> initiatePeerInstance(peer.getCurrentConfiguration(), peer));
374     }
375
376     synchronized void restartPeerGroup(final String peerGroupName) {
377         final var toRestart = peersGroups.get(peerGroupName);
378         if (toRestart != null) {
379             restartPeers(toRestart);
380         }
381     }
382 }