Migrate to MD-SAL APIs
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / config / BGPClusterSingletonService.java
1 /*
2  * Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl.config;
9
10 import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUtil.APPLICATION_PEER_GROUP_NAME;
11 import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUtil.APPLICATION_PEER_GROUP_NAME_OPT;
12 import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUtil.getNeighborInstanceIdentifier;
13 import static org.opendaylight.protocol.bgp.rib.impl.config.OpenConfigMappingUtil.getRibInstanceName;
14
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Optional;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.stream.Collectors;
28 import org.apache.commons.lang3.StringUtils;
29 import org.checkerframework.checker.lock.qual.GuardedBy;
30 import org.checkerframework.checker.lock.qual.Holding;
31 import org.eclipse.jdt.annotation.NonNull;
32 import org.opendaylight.mdsal.binding.api.DataBroker;
33 import org.opendaylight.mdsal.binding.api.DataObjectModification;
34 import org.opendaylight.mdsal.binding.api.RpcProviderService;
35 import org.opendaylight.mdsal.binding.dom.codec.api.BindingCodecTreeFactory;
36 import org.opendaylight.mdsal.common.api.CommitInfo;
37 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
38 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
39 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
40 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
41 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
42 import org.opendaylight.protocol.bgp.openconfig.routing.policy.spi.BGPRibRoutingPolicyFactory;
43 import org.opendaylight.protocol.bgp.openconfig.spi.BGPTableTypeRegistryConsumer;
44 import org.opendaylight.protocol.bgp.rib.impl.spi.BGPDispatcher;
45 import org.opendaylight.protocol.bgp.rib.spi.RIBExtensionConsumerContext;
46 import org.opendaylight.protocol.bgp.rib.spi.util.ClusterSingletonServiceRegistrationHelper;
47 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.neighbor.group.Config;
48 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.neighbors.Neighbor;
49 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.Bgp;
50 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Global;
51 import org.opendaylight.yang.gen.v1.http.openconfig.net.yang.bgp.rev151009.bgp.top.bgp.Neighbors;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.openconfig.extensions.rev180329.NeighborPeerGroupConfig;
53 import org.opendaylight.yangtools.yang.binding.DataObject;
54 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 public final class BGPClusterSingletonService implements ClusterSingletonService, AutoCloseable {
59
60     private static final Logger LOG = LoggerFactory.getLogger(BGPClusterSingletonService.class);
61
62     private static final long TIMEOUT_NS = TimeUnit.SECONDS.toNanos(5);
63     private final InstanceIdentifier<Bgp> bgpIid;
64     @GuardedBy("this")
65     private final Map<InstanceIdentifier<Neighbor>, PeerBean> peers = new HashMap<>();
66     @GuardedBy("this")
67     private final Map<String, List<PeerBean>> peersGroups = new HashMap<>();
68     private final BGPTableTypeRegistryConsumer tableTypeRegistry;
69     private final ServiceGroupIdentifier serviceGroupIdentifier;
70     private final AtomicBoolean instantiated = new AtomicBoolean(false);
71     private final PeerGroupConfigLoader peerGroupLoader;
72     private RibImpl ribImpl;
73     private final RIBExtensionConsumerContext ribExtensionContext;
74     private final BGPDispatcher dispatcher;
75     private final BGPRibRoutingPolicyFactory policyFactory;
76     private final BindingCodecTreeFactory codecFactory;
77     private final DOMDataBroker domBroker;
78     private final DataBroker dataBroker;
79     private final DOMSchemaService schemaService;
80     private final RpcProviderService rpcRegistry;
81
82     BGPClusterSingletonService(
83             final @NonNull PeerGroupConfigLoader peerGroupLoader,
84             final @NonNull ClusterSingletonServiceProvider provider,
85             final @NonNull BGPTableTypeRegistryConsumer tableTypeRegistry,
86             final @NonNull InstanceIdentifier<Bgp> bgpIid,
87             final @NonNull RIBExtensionConsumerContext ribExtensionContext,
88             final @NonNull BGPDispatcher dispatcher,
89             final @NonNull BGPRibRoutingPolicyFactory policyFactory,
90             final @NonNull BindingCodecTreeFactory codecFactory,
91             final @NonNull DOMDataBroker domBroker,
92             final @NonNull DataBroker dataBroker,
93             final @NonNull DOMSchemaService schemaService,
94             final @NonNull RpcProviderService rpcRegistry) {
95         this.peerGroupLoader = peerGroupLoader;
96         this.tableTypeRegistry = tableTypeRegistry;
97         this.bgpIid = bgpIid;
98         this.ribExtensionContext = ribExtensionContext;
99         this.dispatcher = dispatcher;
100         this.policyFactory = policyFactory;
101         this.codecFactory = codecFactory;
102         this.domBroker = domBroker;
103         this.dataBroker = dataBroker;
104         this.schemaService = schemaService;
105         this.rpcRegistry = rpcRegistry;
106         final String ribInstanceName = getRibInstanceName(bgpIid);
107         this.serviceGroupIdentifier = ServiceGroupIdentifier.create(ribInstanceName + "-service-group");
108         LOG.info("BGPClusterSingletonService {} registered", this.serviceGroupIdentifier.getValue());
109         ClusterSingletonServiceRegistrationHelper
110                 .registerSingletonService(provider, this);
111     }
112
113     @Override
114     public synchronized void instantiateServiceInstance() {
115         if (this.ribImpl != null) {
116             this.ribImpl.instantiateServiceInstance();
117             this.peers.values().forEach(PeerBean::instantiateServiceInstance);
118         }
119         this.instantiated.set(true);
120         LOG.info("BGPClusterSingletonService {} instantiated", this.serviceGroupIdentifier.getValue());
121     }
122
123     @Override
124     public synchronized ListenableFuture<? extends CommitInfo> closeServiceInstance() {
125         LOG.info("BGPClusterSingletonService {} close service instance", this.serviceGroupIdentifier.getValue());
126         this.instantiated.set(false);
127
128         final List<ListenableFuture<? extends CommitInfo>> futurePeerCloseList = this.peers.values().stream()
129                 .map(PeerBean::closeServiceInstance).collect(Collectors.toList());
130         final SettableFuture<? extends CommitInfo> done = SettableFuture.create();
131
132         final ListenableFuture<List<CommitInfo>> futureResult = Futures.allAsList(futurePeerCloseList);
133         Futures.addCallback(futureResult, new FutureCallback<List<? extends CommitInfo>>() {
134             @Override
135             public void onSuccess(final List<? extends CommitInfo> result) {
136                 done.setFuture(Futures.transform(BGPClusterSingletonService.this.ribImpl.closeServiceInstance(),
137                     input -> null, MoreExecutors.directExecutor()));
138             }
139
140             @Override
141             public void onFailure(final Throwable throwable) {
142                 LOG.warn("Failed removing peers", throwable);
143             }
144         }, MoreExecutors.directExecutor());
145         return done;
146     }
147
148     @Override
149     public ServiceGroupIdentifier getIdentifier() {
150         return this.serviceGroupIdentifier;
151     }
152
153     synchronized void onGlobalChanged(final DataObjectModification<Global> dataObjectModification) {
154         switch (dataObjectModification.getModificationType()) {
155             case DELETE:
156                 LOG.debug("Removing RIB instance: {}", this.bgpIid);
157                 if (this.ribImpl != null) {
158                     LOG.debug("RIB instance removed {}", this.ribImpl);
159                     closeAllBindedPeers();
160                     closeRibService();
161                     this.ribImpl = null;
162                 }
163                 break;
164             case SUBTREE_MODIFIED:
165             case WRITE:
166                 final Global global = dataObjectModification.getDataAfter();
167                 if (this.ribImpl == null) {
168                     onGlobalCreated(global);
169                 } else if (!this.ribImpl.isGlobalEqual(global)) {
170                     onGlobalUpdated(global);
171                 }
172                 break;
173             default:
174                 break;
175         }
176     }
177
178     private synchronized void onGlobalCreated(final Global global) {
179         LOG.debug("Creating RIB instance with configuration: {}", global);
180         this.ribImpl = new RibImpl(ribExtensionContext, dispatcher, policyFactory, codecFactory, domBroker, dataBroker,
181                 schemaService);
182         initiateRibInstance(global);
183         LOG.debug("RIB instance created: {}", this.ribImpl);
184     }
185
186     private synchronized void onGlobalUpdated(final Global global) {
187         LOG.debug("Modifying RIB instance with configuration: {}", global);
188         final List<PeerBean> closedPeers = closeAllBindedPeers();
189         closeRibService();
190         initiateRibInstance(global);
191         for (final PeerBean peer : closedPeers) {
192             peer.restart(this.ribImpl, this.bgpIid, this.peerGroupLoader, this.tableTypeRegistry);
193         }
194         if (this.instantiated.get()) {
195             closedPeers.forEach(PeerBean::instantiateServiceInstance);
196         }
197         LOG.debug("RIB instance created: {}", this.ribImpl);
198     }
199
200     @SuppressWarnings("checkstyle:illegalCatch")
201     private void closeRibService() {
202         try {
203             this.ribImpl.closeServiceInstance().get(TIMEOUT_NS, TimeUnit.NANOSECONDS);
204         } catch (final Exception e) {
205             LOG.error("RIB instance failed to close service instance", e);
206         }
207         this.ribImpl.close();
208     }
209
210     @Holding("this")
211     private void initiateRibInstance(final Global global) {
212         final String ribInstanceName = getRibInstanceName(this.bgpIid);
213         ribImpl.start(global, ribInstanceName, this.tableTypeRegistry);
214         if (this.instantiated.get()) {
215             this.ribImpl.instantiateServiceInstance();
216         }
217     }
218
219     @SuppressWarnings("checkstyle:illegalCatch")
220     private synchronized List<PeerBean> closeAllBindedPeers() {
221         final List<PeerBean> filtered = new ArrayList<>();
222         this.peers.forEach((key, peer) -> {
223             try {
224                 peer.closeServiceInstance().get();
225             } catch (final Exception e) {
226                 LOG.error("Peer instance failed to close service instance", e);
227             }
228             peer.close();
229             filtered.add(peer);
230         });
231         return filtered;
232     }
233
234     @Override
235     public void close() {
236         LOG.info("BGPClusterSingletonService {} close", this.serviceGroupIdentifier.getValue());
237         this.peers.values().iterator().forEachRemaining(PeerBean::close);
238         this.ribImpl.close();
239         this.peers.clear();
240         this.ribImpl = null;
241     }
242
243     synchronized void onNeighborsChanged(final DataObjectModification<Neighbors> dataObjectModification) {
244         for (final DataObjectModification<? extends DataObject> neighborModification :
245                 dataObjectModification.getModifiedChildren()) {
246             switch (neighborModification.getModificationType()) {
247                 case DELETE:
248                     onNeighborRemoved((Neighbor) neighborModification.getDataBefore());
249                     break;
250                 case SUBTREE_MODIFIED:
251                 case WRITE:
252                     onNeighborModified((Neighbor) neighborModification.getDataAfter());
253                     break;
254                 default:
255                     break;
256             }
257         }
258     }
259
260     private synchronized void onNeighborModified(final Neighbor neighbor) {
261         //restart peer instance with a new configuration
262         final PeerBean bgpPeer = this.peers.get(getNeighborInstanceIdentifier(this.bgpIid, neighbor.key()));
263         if (bgpPeer == null) {
264             onNeighborCreated(neighbor);
265         } else if (!bgpPeer.containsEqualConfiguration(neighbor)) {
266             onNeighborUpdated(bgpPeer, neighbor);
267         }
268     }
269
270     private synchronized void onNeighborCreated(final Neighbor neighbor) {
271         LOG.debug("Creating Peer instance with configuration: {}", neighbor);
272         final PeerBean bgpPeer;
273         if (OpenConfigMappingUtil.isApplicationPeer(neighbor)) {
274             bgpPeer = new AppPeer();
275         } else {
276             bgpPeer = new BgpPeer(this.rpcRegistry);
277         }
278         final InstanceIdentifier<Neighbor> neighborInstanceIdentifier =
279                 getNeighborInstanceIdentifier(this.bgpIid, neighbor.key());
280         initiatePeerInstance(neighborInstanceIdentifier, neighbor, bgpPeer);
281         this.peers.put(neighborInstanceIdentifier, bgpPeer);
282
283         final Optional<String> peerGroupName = getPeerGroupName(neighbor.getConfig());
284         peerGroupName.ifPresent(s -> this.peersGroups.computeIfAbsent(s, k -> new ArrayList<>()).add(bgpPeer));
285         LOG.debug("Peer instance created {}", neighbor.key().getNeighborAddress());
286     }
287
288     private static Optional<String> getPeerGroupName(final Config config) {
289         if (config == null) {
290             return Optional.empty();
291         }
292         final NeighborPeerGroupConfig aug = config.augmentation(NeighborPeerGroupConfig.class);
293         if (aug == null || aug.getPeerGroup() == null) {
294             return Optional.empty();
295         }
296         final String peerGroupName = aug.getPeerGroup();
297         if (peerGroupName.equals(APPLICATION_PEER_GROUP_NAME)) {
298             return APPLICATION_PEER_GROUP_NAME_OPT;
299         }
300         return Optional.of(StringUtils.substringBetween(peerGroupName, "=\"", "\""));
301     }
302
303     private synchronized void onNeighborUpdated(final PeerBean bgpPeer, final Neighbor neighbor) {
304         LOG.debug("Updating Peer instance with configuration: {}", neighbor);
305         closePeer(bgpPeer);
306
307         final InstanceIdentifier<Neighbor> neighborInstanceIdentifier
308                 = getNeighborInstanceIdentifier(this.bgpIid, neighbor.key());
309         initiatePeerInstance(neighborInstanceIdentifier, neighbor, bgpPeer);
310         LOG.debug("Peer instance updated {}", bgpPeer);
311     }
312
313     @SuppressWarnings("checkstyle:illegalCatch")
314     private static void closePeer(final PeerBean bgpPeer) {
315         if (bgpPeer != null) {
316             try {
317                 bgpPeer.closeServiceInstance().get();
318                 bgpPeer.close();
319                 LOG.debug("Peer instance closed {}", bgpPeer);
320             } catch (final Exception e) {
321                 LOG.error("Peer instance failed to close service instance", e);
322             }
323         }
324     }
325
326     private synchronized void onNeighborRemoved(final Neighbor neighbor) {
327         LOG.debug("Removing Peer instance: {}", neighbor);
328         final PeerBean bgpPeer = this.peers.remove(getNeighborInstanceIdentifier(this.bgpIid, neighbor.key()));
329
330         final Optional<String> groupName = getPeerGroupName(neighbor.getConfig());
331         groupName.ifPresent(s -> this.peersGroups.computeIfPresent(s, (k, groupPeers) -> {
332             groupPeers.remove(bgpPeer);
333             return groupPeers.isEmpty() ? null : groupPeers;
334         }));
335         closePeer(bgpPeer);
336     }
337
338     private synchronized void initiatePeerInstance(final InstanceIdentifier<Neighbor> neighborIdentifier,
339             final Neighbor neighbor, final PeerBean bgpPeer) {
340         if (this.ribImpl != null) {
341             bgpPeer.start(this.ribImpl, neighbor, this.bgpIid, this.peerGroupLoader, this.tableTypeRegistry);
342         }
343         if (this.instantiated.get()) {
344             bgpPeer.instantiateServiceInstance();
345         }
346     }
347
348     @SuppressWarnings("checkstyle:illegalCatch")
349     synchronized void restartNeighbors(final String peerGroupName) {
350         final List<PeerBean> peerGroup = this.peersGroups.get(peerGroupName);
351         if (peerGroup == null) {
352             return;
353         }
354         for (final PeerBean peer : peerGroup) {
355             try {
356                 peer.closeServiceInstance().get();
357             } catch (final Exception e) {
358                 LOG.error("Peer instance failed to close service instance", e);
359             }
360             peer.restart(this.ribImpl, this.bgpIid, this.peerGroupLoader, this.tableTypeRegistry);
361             peer.instantiateServiceInstance();
362         }
363     }
364 }