Fix modernization issues
[controller.git] / opendaylight / blueprint / src / main / java / org / opendaylight / controller / blueprint / BlueprintContainerRestartServiceImpl.java
1 /*
2  * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.blueprint;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.collect.Lists;
13 import com.google.common.util.concurrent.ThreadFactoryBuilder;
14 import java.util.ArrayDeque;
15 import java.util.ArrayList;
16 import java.util.Arrays;
17 import java.util.Collections;
18 import java.util.Deque;
19 import java.util.Hashtable;
20 import java.util.LinkedHashSet;
21 import java.util.List;
22 import java.util.Set;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.TimeUnit;
27 import org.apache.aries.blueprint.services.BlueprintExtenderService;
28 import org.apache.aries.quiesce.participant.QuiesceParticipant;
29 import org.apache.aries.util.AriesFrameworkUtil;
30 import org.osgi.framework.Bundle;
31 import org.osgi.framework.BundleContext;
32 import org.osgi.framework.ServiceReference;
33 import org.osgi.framework.ServiceRegistration;
34 import org.osgi.service.blueprint.container.BlueprintEvent;
35 import org.osgi.service.blueprint.container.BlueprintListener;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Implementation of the BlueprintContainerRestartService.
41  *
42  * @author Thomas Pantelis
43  */
44 class BlueprintContainerRestartServiceImpl implements AutoCloseable, BlueprintContainerRestartService {
45     private static final Logger LOG = LoggerFactory.getLogger(BlueprintContainerRestartServiceImpl.class);
46     private static final int CONTAINER_CREATE_TIMEOUT_IN_MINUTES = 5;
47
48     private final ExecutorService restartExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
49             .setDaemon(true).setNameFormat("BlueprintContainerRestartService").build());
50
51     private BlueprintExtenderService blueprintExtenderService;
52     private QuiesceParticipant quiesceParticipant;
53
54     void setBlueprintExtenderService(final BlueprintExtenderService blueprintExtenderService) {
55         this.blueprintExtenderService = blueprintExtenderService;
56     }
57
58     void setQuiesceParticipant(final QuiesceParticipant quiesceParticipant) {
59         this.quiesceParticipant = quiesceParticipant;
60     }
61
62     public void restartContainer(final Bundle bundle, final List<Object> paths) {
63         LOG.debug("restartContainer for bundle {}", bundle);
64
65         if (restartExecutor.isShutdown()) {
66             LOG.debug("Already closed - returning");
67             return;
68         }
69
70         restartExecutor.execute(() -> {
71             blueprintExtenderService.destroyContainer(bundle, blueprintExtenderService.getContainer(bundle));
72             blueprintExtenderService.createContainer(bundle, paths);
73         });
74     }
75
76     @Override
77     public void restartContainerAndDependents(final Bundle bundle) {
78         if (restartExecutor.isShutdown()) {
79             return;
80         }
81
82         LOG.debug("restartContainerAndDependents for bundle {}", bundle);
83
84         restartExecutor.execute(() -> restartContainerAndDependentsInternal(bundle));
85     }
86
87     private void restartContainerAndDependentsInternal(final Bundle forBundle) {
88         requireNonNull(blueprintExtenderService);
89         requireNonNull(quiesceParticipant);
90
91         // We use a LinkedHashSet to preserve insertion order as we walk the service usage hierarchy.
92         Set<Bundle> containerBundlesSet = new LinkedHashSet<>();
93         findDependentContainersRecursively(forBundle, containerBundlesSet);
94
95         List<Bundle> containerBundles = new ArrayList<>(containerBundlesSet);
96
97         LOG.info("Restarting blueprint containers for bundle {} and its dependent bundles {}", forBundle,
98                 containerBundles.subList(1, containerBundles.size()));
99
100         // The blueprint containers are created asynchronously so we register a handler for blueprint events
101         // that are sent when a container is complete, successful or not. The CountDownLatch tells when all
102         // containers are complete. This is done to ensure all blueprint containers are finished before we
103         // restart config modules.
104         final CountDownLatch containerCreationComplete = new CountDownLatch(containerBundles.size());
105         ServiceRegistration<?> eventHandlerReg = registerEventHandler(forBundle.getBundleContext(), event -> {
106             final Bundle bundle = event.getBundle();
107             if (event.isReplay()) {
108                 LOG.trace("Got replay BlueprintEvent {} for bundle {}", event.getType(), bundle);
109                 return;
110             }
111
112             LOG.debug("Got BlueprintEvent {} for bundle {}", event.getType(), bundle);
113             if (containerBundles.contains(bundle)
114                     && (event.getType() == BlueprintEvent.CREATED || event.getType() == BlueprintEvent.FAILURE)) {
115                 containerCreationComplete.countDown();
116                 LOG.debug("containerCreationComplete is now {}", containerCreationComplete.getCount());
117             }
118         });
119
120         final Runnable createContainerCallback = () -> createContainers(containerBundles);
121
122         // Destroy the container down-top recursively and once done, restart the container top-down
123         destroyContainers(new ArrayDeque<>(Lists.reverse(containerBundles)), createContainerCallback);
124
125
126         try {
127             if (!containerCreationComplete.await(CONTAINER_CREATE_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES)) {
128                 LOG.warn("Failed to restart all blueprint containers within {} minutes. Attempted to restart {} {} "
129                         + "but only {} completed restart", CONTAINER_CREATE_TIMEOUT_IN_MINUTES, containerBundles.size(),
130                         containerBundles, containerBundles.size() - containerCreationComplete.getCount());
131                 return;
132             }
133         } catch (final InterruptedException e) {
134             LOG.debug("CountDownLatch await was interrupted - returning");
135             return;
136         }
137
138         AriesFrameworkUtil.safeUnregisterService(eventHandlerReg);
139
140         LOG.info("Finished restarting blueprint containers for bundle {} and its dependent bundles", forBundle);
141     }
142
143     /**
144      * Recursively quiesce and destroy the bundles one by one in order to maintain synchronicity and ordering.
145      * @param remainingBundlesToDestroy the list of remaining bundles to destroy.
146      * @param createContainerCallback a {@link Runnable} to {@code run()} when the recursive function is completed.
147      */
148     private void destroyContainers(final Deque<Bundle> remainingBundlesToDestroy,
149             final Runnable createContainerCallback) {
150
151         final Bundle nextBundle;
152         synchronized (remainingBundlesToDestroy) {
153             if (remainingBundlesToDestroy.isEmpty()) {
154                 LOG.debug("All blueprint containers were quiesced and destroyed");
155                 createContainerCallback.run();
156                 return;
157             }
158
159             nextBundle = remainingBundlesToDestroy.poll();
160         }
161
162         // The Quiesce capability is a like a soft-stop, clean-stop. In the case of the Blueprint extender, in flight
163         // service calls are allowed to finish; they're counted in and counted out, and no new calls are allowed. When
164         // there are no in flight service calls, the bundle is told to stop. The Blueprint bundle itself doesn't know
165         // this is happening which is a key design point. In the case of Blueprint, the extender ensures no new Entity
166         // Managers(EMs) are created. Then when all those EMs are closed the quiesce operation reports that it is
167         // finished.
168         // To properly restart the blueprint containers, first we have to quiesce the list of bundles, and once done, it
169         // is safe to destroy their BlueprintContainer, so no reference is retained.
170         //
171         // Mail - thread explaining Quiesce API:
172         //      https://www.mail-archive.com/dev@aries.apache.org/msg08403.html
173
174         // Quiesced the bundle to unregister the associated BlueprintContainer
175         quiesceParticipant.quiesce(bundlesQuiesced -> {
176
177             // Destroy the container once Quiesced
178             Arrays.stream(bundlesQuiesced).forEach(quiescedBundle -> {
179                 LOG.debug("Quiesced bundle {}", quiescedBundle);
180                 blueprintExtenderService.destroyContainer(
181                         quiescedBundle, blueprintExtenderService.getContainer(quiescedBundle));
182             });
183
184             destroyContainers(remainingBundlesToDestroy, createContainerCallback);
185
186         }, Collections.singletonList(nextBundle));
187     }
188
189     private void createContainers(final List<Bundle> containerBundles) {
190         containerBundles.forEach(bundle -> {
191             List<Object> paths = BlueprintBundleTracker.findBlueprintPaths(bundle);
192
193             LOG.info("Restarting blueprint container for bundle {} with paths {}", bundle, paths);
194
195             blueprintExtenderService.createContainer(bundle, paths);
196         });
197     }
198
199     /**
200      * Recursively finds the services registered by the given bundle and the bundles using those services.
201      * User bundles that have an associated blueprint container are added to containerBundles.
202      *
203      * @param bundle the bundle to traverse
204      * @param containerBundles the current set of bundles containing blueprint containers
205      */
206     private void findDependentContainersRecursively(final Bundle bundle, final Set<Bundle> containerBundles) {
207         if (!containerBundles.add(bundle)) {
208             // Already seen this bundle...
209             return;
210         }
211
212         ServiceReference<?>[] references = bundle.getRegisteredServices();
213         if (references != null) {
214             for (ServiceReference<?> reference : references) {
215                 Bundle[] usingBundles = reference.getUsingBundles();
216                 if (usingBundles != null) {
217                     for (Bundle usingBundle : usingBundles) {
218                         if (blueprintExtenderService.getContainer(usingBundle) != null) {
219                             findDependentContainersRecursively(usingBundle, containerBundles);
220                         }
221                     }
222                 }
223             }
224         }
225     }
226
227     private static ServiceRegistration<?> registerEventHandler(final BundleContext bundleContext,
228             final BlueprintListener listener) {
229         return bundleContext.registerService(BlueprintListener.class.getName(), listener, new Hashtable<>());
230     }
231
232     @Override
233     public void close() {
234         LOG.debug("Closing");
235
236         restartExecutor.shutdownNow();
237     }
238 }