Fixup checkstyle
[controller.git] / opendaylight / blueprint / src / main / java / org / opendaylight / controller / blueprint / BlueprintContainerRestartServiceImpl.java
1 /*
2  * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.blueprint;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.collect.Lists;
13 import com.google.common.util.concurrent.ThreadFactoryBuilder;
14 import java.util.ArrayDeque;
15 import java.util.ArrayList;
16 import java.util.Arrays;
17 import java.util.Collections;
18 import java.util.Deque;
19 import java.util.LinkedHashSet;
20 import java.util.List;
21 import java.util.Set;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.TimeUnit;
26 import org.apache.aries.blueprint.services.BlueprintExtenderService;
27 import org.apache.aries.quiesce.participant.QuiesceParticipant;
28 import org.apache.aries.util.AriesFrameworkUtil;
29 import org.osgi.framework.Bundle;
30 import org.osgi.framework.BundleContext;
31 import org.osgi.framework.ServiceReference;
32 import org.osgi.framework.ServiceRegistration;
33 import org.osgi.service.blueprint.container.BlueprintEvent;
34 import org.osgi.service.blueprint.container.BlueprintListener;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * Implementation of the BlueprintContainerRestartService.
40  *
41  * @author Thomas Pantelis
42  */
43 class BlueprintContainerRestartServiceImpl implements AutoCloseable, BlueprintContainerRestartService {
44     private static final Logger LOG = LoggerFactory.getLogger(BlueprintContainerRestartServiceImpl.class);
45     private static final int CONTAINER_CREATE_TIMEOUT_IN_MINUTES = 5;
46
47     private final ExecutorService restartExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
48             .setDaemon(true).setNameFormat("BlueprintContainerRestartService").build());
49
50     private BlueprintExtenderService blueprintExtenderService;
51     private QuiesceParticipant quiesceParticipant;
52
53     void setBlueprintExtenderService(final BlueprintExtenderService blueprintExtenderService) {
54         this.blueprintExtenderService = blueprintExtenderService;
55     }
56
57     void setQuiesceParticipant(final QuiesceParticipant quiesceParticipant) {
58         this.quiesceParticipant = quiesceParticipant;
59     }
60
61     public void restartContainer(final Bundle bundle, final List<Object> paths) {
62         LOG.debug("restartContainer for bundle {}", bundle);
63
64         if (restartExecutor.isShutdown()) {
65             LOG.debug("Already closed - returning");
66             return;
67         }
68
69         restartExecutor.execute(() -> {
70             blueprintExtenderService.destroyContainer(bundle, blueprintExtenderService.getContainer(bundle));
71             blueprintExtenderService.createContainer(bundle, paths);
72         });
73     }
74
75     @Override
76     public void restartContainerAndDependents(final Bundle bundle) {
77         if (restartExecutor.isShutdown()) {
78             return;
79         }
80
81         LOG.debug("restartContainerAndDependents for bundle {}", bundle);
82
83         restartExecutor.execute(() -> restartContainerAndDependentsInternal(bundle));
84     }
85
86     private void restartContainerAndDependentsInternal(final Bundle forBundle) {
87         requireNonNull(blueprintExtenderService);
88         requireNonNull(quiesceParticipant);
89
90         // We use a LinkedHashSet to preserve insertion order as we walk the service usage hierarchy.
91         Set<Bundle> containerBundlesSet = new LinkedHashSet<>();
92         findDependentContainersRecursively(forBundle, containerBundlesSet);
93
94         List<Bundle> containerBundles = new ArrayList<>(containerBundlesSet);
95
96         LOG.info("Restarting blueprint containers for bundle {} and its dependent bundles {}", forBundle,
97                 containerBundles.subList(1, containerBundles.size()));
98
99         // The blueprint containers are created asynchronously so we register a handler for blueprint events
100         // that are sent when a container is complete, successful or not. The CountDownLatch tells when all
101         // containers are complete. This is done to ensure all blueprint containers are finished before we
102         // restart config modules.
103         final CountDownLatch containerCreationComplete = new CountDownLatch(containerBundles.size());
104         ServiceRegistration<?> eventHandlerReg = registerEventHandler(forBundle.getBundleContext(), event -> {
105             final Bundle bundle = event.getBundle();
106             if (event.isReplay()) {
107                 LOG.trace("Got replay BlueprintEvent {} for bundle {}", event.getType(), bundle);
108                 return;
109             }
110
111             LOG.debug("Got BlueprintEvent {} for bundle {}", event.getType(), bundle);
112             if (containerBundles.contains(bundle)
113                     && (event.getType() == BlueprintEvent.CREATED || event.getType() == BlueprintEvent.FAILURE)) {
114                 containerCreationComplete.countDown();
115                 LOG.debug("containerCreationComplete is now {}", containerCreationComplete.getCount());
116             }
117         });
118
119         final Runnable createContainerCallback = () -> createContainers(containerBundles);
120
121         // Destroy the container down-top recursively and once done, restart the container top-down
122         destroyContainers(new ArrayDeque<>(Lists.reverse(containerBundles)), createContainerCallback);
123
124
125         try {
126             if (!containerCreationComplete.await(CONTAINER_CREATE_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES)) {
127                 LOG.warn("Failed to restart all blueprint containers within {} minutes. Attempted to restart {} {} "
128                         + "but only {} completed restart", CONTAINER_CREATE_TIMEOUT_IN_MINUTES, containerBundles.size(),
129                         containerBundles, containerBundles.size() - containerCreationComplete.getCount());
130                 return;
131             }
132         } catch (final InterruptedException e) {
133             LOG.debug("CountDownLatch await was interrupted - returning");
134             return;
135         }
136
137         AriesFrameworkUtil.safeUnregisterService(eventHandlerReg);
138
139         LOG.info("Finished restarting blueprint containers for bundle {} and its dependent bundles", forBundle);
140     }
141
142     /**
143      * Recursively quiesce and destroy the bundles one by one in order to maintain synchronicity and ordering.
144      * @param remainingBundlesToDestroy the list of remaining bundles to destroy.
145      * @param createContainerCallback a {@link Runnable} to {@code run()} when the recursive function is completed.
146      */
147     private void destroyContainers(final Deque<Bundle> remainingBundlesToDestroy,
148             final Runnable createContainerCallback) {
149
150         final Bundle nextBundle;
151         synchronized (remainingBundlesToDestroy) {
152             if (remainingBundlesToDestroy.isEmpty()) {
153                 LOG.debug("All blueprint containers were quiesced and destroyed");
154                 createContainerCallback.run();
155                 return;
156             }
157
158             nextBundle = remainingBundlesToDestroy.poll();
159         }
160
161         // The Quiesce capability is a like a soft-stop, clean-stop. In the case of the Blueprint extender, in flight
162         // service calls are allowed to finish; they're counted in and counted out, and no new calls are allowed. When
163         // there are no in flight service calls, the bundle is told to stop. The Blueprint bundle itself doesn't know
164         // this is happening which is a key design point. In the case of Blueprint, the extender ensures no new Entity
165         // Managers(EMs) are created. Then when all those EMs are closed the quiesce operation reports that it is
166         // finished.
167         // To properly restart the blueprint containers, first we have to quiesce the list of bundles, and once done, it
168         // is safe to destroy their BlueprintContainer, so no reference is retained.
169         //
170         // Mail - thread explaining Quiesce API:
171         //      https://www.mail-archive.com/dev@aries.apache.org/msg08403.html
172
173         // Quiesced the bundle to unregister the associated BlueprintContainer
174         quiesceParticipant.quiesce(bundlesQuiesced -> {
175
176             // Destroy the container once Quiesced
177             Arrays.stream(bundlesQuiesced).forEach(quiescedBundle -> {
178                 LOG.debug("Quiesced bundle {}", quiescedBundle);
179                 blueprintExtenderService.destroyContainer(
180                         quiescedBundle, blueprintExtenderService.getContainer(quiescedBundle));
181             });
182
183             destroyContainers(remainingBundlesToDestroy, createContainerCallback);
184
185         }, Collections.singletonList(nextBundle));
186     }
187
188     private void createContainers(final List<Bundle> containerBundles) {
189         containerBundles.forEach(bundle -> {
190             List<Object> paths = BlueprintBundleTracker.findBlueprintPaths(bundle);
191
192             LOG.info("Restarting blueprint container for bundle {} with paths {}", bundle, paths);
193
194             blueprintExtenderService.createContainer(bundle, paths);
195         });
196     }
197
198     /**
199      * Recursively finds the services registered by the given bundle and the bundles using those services.
200      * User bundles that have an associated blueprint container are added to containerBundles.
201      *
202      * @param bundle the bundle to traverse
203      * @param containerBundles the current set of bundles containing blueprint containers
204      */
205     private void findDependentContainersRecursively(final Bundle bundle, final Set<Bundle> containerBundles) {
206         if (!containerBundles.add(bundle)) {
207             // Already seen this bundle...
208             return;
209         }
210
211         ServiceReference<?>[] references = bundle.getRegisteredServices();
212         if (references != null) {
213             for (ServiceReference<?> reference : references) {
214                 Bundle[] usingBundles = reference.getUsingBundles();
215                 if (usingBundles != null) {
216                     for (Bundle usingBundle : usingBundles) {
217                         if (blueprintExtenderService.getContainer(usingBundle) != null) {
218                             findDependentContainersRecursively(usingBundle, containerBundles);
219                         }
220                     }
221                 }
222             }
223         }
224     }
225
226     private static ServiceRegistration<?> registerEventHandler(final BundleContext bundleContext,
227             final BlueprintListener listener) {
228         return bundleContext.registerService(BlueprintListener.class, listener, null);
229     }
230
231     @Override
232     public void close() {
233         LOG.debug("Closing");
234
235         restartExecutor.shutdownNow();
236     }
237 }