2 * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.protocol.bgp.rib.impl.config;
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUtil.APPLICATION_PEER_GROUP_NAME;
12 import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUtil.APPLICATION_PEER_GROUP_NAME_OPT;
13 import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUtil.getNeighborInstanceIdentifier;
14 import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUtil.getRibInstanceName;
16 import com.google.common.annotations.VisibleForTesting;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import com.google.common.util.concurrent.SettableFuture;
22 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.List;
28 import java.util.Optional;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.stream.Collectors;
32 import org.apache.commons.lang3.StringUtils;
33 import org.checkerframework.checker.lock.qual.GuardedBy;
34 import org.checkerframework.checker.lock.qual.Holding;
35 import org.eclipse.jdt.annotation.NonNull;
36 import org.opendaylight.mdsal.binding.api.DataObjectModification;
37 import org.opendaylight.mdsal.binding.api.RpcProviderService;
38 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
39 import org.opendaylight.mdsal.singleton.api.ClusterSingletonService;
40 import org.opendaylight.mdsal.singleton.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.api.ServiceGroupIdentifier;
42 import org.opendaylight.protocol.bgp.openconfig.routing.policy.spi.BGPRibRoutingPolicyFactory;
43 import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
44 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
45 import org.opendaylight.protocol.bgp.rib.impl.spi.CodecsRegistry;
46 import org.opendaylight.protocol.bgp.rib.spi.RIBExtensionConsumerContext;
47 import org.opendaylight.protocol.bgp.rib.spi.state.BGPStateProviderRegistry;
48 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.neighbor.group.Config;
49 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.neighbors.Neighbor;
50 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.Bgp;
51 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Global;
52 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Neighbors;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.openconfig.extensions.rev180329.network.instance.protocol.NeighborPeerGroupConfig;
54 import org.opendaylight.yangtools.concepts.Registration;
55 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
56 import org.opendaylight.yangtools.yang.common.Empty;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
61 @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Non-final for Mockito.spy()")
62 public class BGPClusterSingletonService implements ClusterSingletonService, AutoCloseable {
63 private static final Logger LOG = LoggerFactory.getLogger(BGPClusterSingletonService.class);
65 private final InstanceIdentifier<Bgp> bgpIid;
66 private final BGPTableTypeRegistryConsumer tableTypeRegistry;
67 private final @NonNull ServiceGroupIdentifier serviceGroupIdentifier;
68 private final AtomicBoolean instantiated = new AtomicBoolean(false);
69 private final PeerGroupConfigLoader peerGroupLoader;
70 private final RpcProviderService rpcRegistry;
71 private final RIBExtensionConsumerContext ribExtensionContext;
72 private final BGPDispatcher bgpDispatcher;
73 private final BGPRibRoutingPolicyFactory routingPolicyFactory;
74 private final CodecsRegistry codecsRegistry;
75 private final BGPStateProviderRegistry stateProviderRegistry;
76 private final DOMDataBroker domDataBroker;
79 private final Map<InstanceIdentifier<Neighbor>, PeerBean> peers = new HashMap<>();
81 private final Map<String, List<PeerBean>> peersGroups = new HashMap<>();
83 private RibImpl ribImpl;
85 private Registration cssRegistration;
87 BGPClusterSingletonService(
88 final @NonNull PeerGroupConfigLoader peerGroupLoader,
89 final @NonNull ClusterSingletonServiceProvider provider,
90 final @NonNull BGPTableTypeRegistryConsumer tableTypeRegistry,
91 final @NonNull RpcProviderService rpcRegistry,
92 final @NonNull RIBExtensionConsumerContext ribExtensionContext,
93 final @NonNull BGPDispatcher bgpDispatcher,
94 final @NonNull BGPRibRoutingPolicyFactory routingPolicyFactory,
95 final @NonNull CodecsRegistry codecsRegistry,
96 final @NonNull BGPStateProviderRegistry stateProviderRegistry,
97 final @NonNull DOMDataBroker domDataBroker,
98 final @NonNull InstanceIdentifier<Bgp> bgpIid) {
99 this.peerGroupLoader = peerGroupLoader;
100 this.tableTypeRegistry = tableTypeRegistry;
101 this.rpcRegistry = rpcRegistry;
102 this.ribExtensionContext = ribExtensionContext;
103 this.bgpDispatcher = bgpDispatcher;
104 this.routingPolicyFactory = routingPolicyFactory;
105 this.codecsRegistry = codecsRegistry;
106 this.stateProviderRegistry = stateProviderRegistry;
107 this.domDataBroker = domDataBroker;
108 this.bgpIid = bgpIid;
109 serviceGroupIdentifier = new ServiceGroupIdentifier(getRibInstanceName(bgpIid) + "-service-group");
110 cssRegistration = provider.registerClusterSingletonService(this);
111 LOG.info("BGPClusterSingletonService {} registered", serviceGroupIdentifier.value());
115 public ServiceGroupIdentifier getIdentifier() {
116 return serviceGroupIdentifier;
120 public synchronized void instantiateServiceInstance() {
121 if (ribImpl != null) {
122 ribImpl.instantiateServiceInstance();
123 peers.values().forEach(PeerBean::instantiateServiceInstance);
125 instantiated.set(true);
126 LOG.info("BGPClusterSingletonService {} instantiated", serviceGroupIdentifier.value());
130 public synchronized ListenableFuture<?> closeServiceInstance() {
131 LOG.info("BGPClusterSingletonService {} close service instance", serviceGroupIdentifier.value());
132 instantiated.set(false);
134 final List<ListenableFuture<?>> futurePeerCloseList = peers.values().stream()
135 .map(PeerBean::closeServiceInstance).collect(Collectors.toList());
136 final SettableFuture<Empty> done = SettableFuture.create();
138 final ListenableFuture<List<Object>> futureResult = Futures.allAsList(futurePeerCloseList);
139 Futures.addCallback(futureResult, new FutureCallback<List<?>>() {
141 public void onSuccess(final List<?> result) {
142 synchronized (BGPClusterSingletonService.this) {
143 if (ribImpl != null) {
144 done.setFuture(Futures.transform(ribImpl.closeServiceInstance(),
145 input -> Empty.value(), MoreExecutors.directExecutor()));
147 done.set(Empty.value());
153 public void onFailure(final Throwable throwable) {
154 LOG.warn("Failed removing peers", throwable);
156 }, MoreExecutors.directExecutor());
160 synchronized void onGlobalChanged(final DataObjectModification<Global> dataObjectModification) {
161 switch (dataObjectModification.modificationType()) {
163 LOG.debug("Removing RIB instance: {}", bgpIid);
164 if (ribImpl != null) {
165 LOG.debug("RIB instance removed {}", ribImpl);
171 case SUBTREE_MODIFIED:
173 final Global global = dataObjectModification.dataAfter();
174 if (ribImpl == null) {
175 onGlobalCreated(global);
176 } else if (!ribImpl.isGlobalEqual(requireNonNull(global))) {
177 onGlobalUpdated(global);
186 private void onGlobalCreated(final Global global) {
187 LOG.debug("Creating RIB instance with configuration: {}", global);
188 ribImpl = new RibImpl(ribExtensionContext, bgpDispatcher, routingPolicyFactory, codecsRegistry,
189 stateProviderRegistry, domDataBroker);
190 initiateRibInstance(global);
191 LOG.debug("RIB instance created: {}", ribImpl);
195 private void onGlobalUpdated(final Global global) {
196 LOG.info("Global config {} updated, new configuration {}", global.getConfig().getRouterId(), global);
198 initiateRibInstance(global);
199 restartPeers(peers.values());
204 void closeRibInstance() {
206 ribImpl.stop().get();
207 } catch (InterruptedException e) {
208 LOG.error("Interrupted while waiting for RIB instance {} to close", ribImpl.getBgpIdentifier(), e);
209 } catch (ExecutionException e) {
210 LOG.error("RIB instance {} failed to close", ribImpl.getBgpIdentifier(), e);
216 void initiateRibInstance(final Global global) {
217 final String ribInstanceName = getRibInstanceName(bgpIid);
218 ribImpl.start(global, ribInstanceName, tableTypeRegistry);
219 if (instantiated.get()) {
220 ribImpl.instantiateServiceInstance();
225 private List<PeerBean> closeBoundPeers() {
226 final List<PeerBean> filtered = new ArrayList<>(peers.size());
227 peers.forEach((key, peer) -> {
228 if (closePeer(peer)) {
236 public synchronized void close() {
237 if (cssRegistration == null) {
238 // Idempotent as per AutoCloseable contract
242 LOG.info("Closing BGPClusterSingletonService {}", serviceGroupIdentifier.value());
243 cssRegistration.close();
244 cssRegistration = null;
254 void onNeighborsChanged(final DataObjectModification<Neighbors> dataObjectModification) {
255 for (var neighborModification : dataObjectModification.modifiedChildren()) {
256 switch (neighborModification.modificationType()) {
258 onNeighborRemoved((Neighbor) neighborModification.dataBefore());
260 case SUBTREE_MODIFIED:
262 onNeighborModified((Neighbor) neighborModification.dataAfter());
271 private void onNeighborModified(final Neighbor neighbor) {
272 //restart peer instance with a new configuration
273 final PeerBean bgpPeer = peers.get(getNeighborInstanceIdentifier(bgpIid, neighbor.key()));
274 if (bgpPeer == null) {
275 onNeighborCreated(neighbor);
276 } else if (!bgpPeer.containsEqualConfiguration(neighbor)) {
277 onNeighborUpdated(bgpPeer, neighbor);
283 void onNeighborCreated(final Neighbor neighbor) {
284 LOG.info("Creating Peer instance {} with configuration: {}", neighbor.getNeighborAddress(), neighbor);
285 final PeerBean bgpPeer;
286 if (OpenConfigMappingUtil.isApplicationPeer(neighbor)) {
287 bgpPeer = new AppPeer(stateProviderRegistry);
289 bgpPeer = new BgpPeer(rpcRegistry, stateProviderRegistry);
291 final InstanceIdentifier<Neighbor> neighborInstanceIdentifier =
292 getNeighborInstanceIdentifier(bgpIid, neighbor.key());
293 initiatePeerInstance(neighbor, bgpPeer);
294 peers.put(neighborInstanceIdentifier, bgpPeer);
296 final Optional<String> peerGroupName = getPeerGroupName(neighbor.getConfig());
297 peerGroupName.ifPresent(s -> peersGroups.computeIfAbsent(s, k -> new ArrayList<>()).add(bgpPeer));
298 LOG.info("Peer instance created {}", neighbor.getNeighborAddress());
303 void onNeighborUpdated(final PeerBean bgpPeer, final Neighbor neighbor) {
304 LOG.info("Updating Peer {} with new configuration: {}", neighbor.getNeighborAddress(), neighbor);
306 initiatePeerInstance(neighbor, bgpPeer);
309 private static Optional<String> getPeerGroupName(final Config config) {
310 if (config == null) {
311 return Optional.empty();
313 final NeighborPeerGroupConfig aug = config.augmentation(NeighborPeerGroupConfig.class);
314 if (aug == null || aug.getPeerGroup() == null) {
315 return Optional.empty();
317 final String peerGroupName = aug.getPeerGroup();
318 if (peerGroupName.equals(APPLICATION_PEER_GROUP_NAME)) {
319 return APPLICATION_PEER_GROUP_NAME_OPT;
321 return Optional.of(StringUtils.substringBetween(peerGroupName, "=\"", "\""));
324 private static boolean closePeer(final PeerBean bgpPeer) {
325 if (bgpPeer == null) {
330 bgpPeer.stop().get();
331 } catch (InterruptedException e) {
332 LOG.error("Interrupted while waiting for peer instance failed to close service", e);
334 } catch (ExecutionException e) {
335 LOG.error("Peer instance failed to close service instance", e);
339 LOG.info("Peer instance {} closed", bgpPeer.getCurrentConfiguration().getNeighborAddress());
345 public void onNeighborRemoved(final Neighbor neighbor) {
346 LOG.info("Removing Peer instance: {}", neighbor.getNeighborAddress());
347 final PeerBean bgpPeer = peers.remove(getNeighborInstanceIdentifier(bgpIid, neighbor.key()));
349 final Optional<String> groupName = getPeerGroupName(neighbor.getConfig());
350 groupName.ifPresent(s -> peersGroups.computeIfPresent(s, (k, groupPeers) -> {
351 groupPeers.remove(bgpPeer);
352 return groupPeers.isEmpty() ? null : groupPeers;
359 // FIXME: synchronized because SpotBugs does not understand @Holding with @VisibleForTesting (which we need for
361 synchronized void initiatePeerInstance(final Neighbor neighbor, final PeerBean bgpPeer) {
362 if (ribImpl != null) {
363 bgpPeer.start(ribImpl, neighbor, bgpIid, peerGroupLoader, tableTypeRegistry);
365 if (instantiated.get()) {
366 bgpPeer.instantiateServiceInstance();
371 private void restartPeers(final Collection<PeerBean> toRestart) {
372 toRestart.stream().filter(BGPClusterSingletonService::closePeer)
373 .forEach(peer -> initiatePeerInstance(peer.getCurrentConfiguration(), peer));
376 synchronized void restartPeerGroup(final String peerGroupName) {
377 final var toRestart = peersGroups.get(peerGroupName);
378 if (toRestart != null) {
379 restartPeers(toRestart);