X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fblueprint%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fblueprint%2FBlueprintContainerRestartServiceImpl.java;h=0e52cb6fb23e7dd26fe604c12b7053cf633d4a9d;hb=3859df9beca8f13f1ff2b2744ed3470a1715bec3;hp=3e04f213495c2cc37c52b19fdf4348e524a9b2bb;hpb=36de1fd1c214d98ae97e6d7725f323dd01e34977;p=controller.git diff --git a/opendaylight/blueprint/src/main/java/org/opendaylight/controller/blueprint/BlueprintContainerRestartServiceImpl.java b/opendaylight/blueprint/src/main/java/org/opendaylight/controller/blueprint/BlueprintContainerRestartServiceImpl.java index 3e04f21349..0e52cb6fb2 100644 --- a/opendaylight/blueprint/src/main/java/org/opendaylight/controller/blueprint/BlueprintContainerRestartServiceImpl.java +++ b/opendaylight/blueprint/src/main/java/org/opendaylight/controller/blueprint/BlueprintContainerRestartServiceImpl.java @@ -7,48 +7,88 @@ */ package org.opendaylight.controller.blueprint; +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Deque; +import java.util.Hashtable; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.aries.blueprint.services.BlueprintExtenderService; +import org.apache.aries.quiesce.participant.QuiesceParticipant; +import org.apache.aries.util.AriesFrameworkUtil; import org.osgi.framework.Bundle; +import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceReference; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.blueprint.container.BlueprintEvent; +import org.osgi.service.blueprint.container.BlueprintListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Implementation of the BlueprintContainerRestartService. +/** + * Implementation of the BlueprintContainerRestartService. * * @author Thomas Pantelis */ class BlueprintContainerRestartServiceImpl implements AutoCloseable, BlueprintContainerRestartService { private static final Logger LOG = LoggerFactory.getLogger(BlueprintContainerRestartServiceImpl.class); + private static final int CONTAINER_CREATE_TIMEOUT_IN_MINUTES = 5; + + private final ExecutorService restartExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("BlueprintContainerRestartService").build()); - private final ExecutorService restartExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder(). - setDaemon(true).setNameFormat("BlueprintContainerRestartService").build()); - private final BlueprintExtenderService blueprintExtenderService; + private BlueprintExtenderService blueprintExtenderService; + private QuiesceParticipant quiesceParticipant; - BlueprintContainerRestartServiceImpl(BlueprintExtenderService blueprintExtenderService) { + void setBlueprintExtenderService(final BlueprintExtenderService blueprintExtenderService) { this.blueprintExtenderService = blueprintExtenderService; } + void setQuiesceParticipant(final QuiesceParticipant quiesceParticipant) { + this.quiesceParticipant = quiesceParticipant; + } + + public void restartContainer(final Bundle bundle, final List paths) { + LOG.debug("restartContainer for bundle {}", bundle); + + if (restartExecutor.isShutdown()) { + LOG.debug("Already closed - returning"); + return; + } + + restartExecutor.execute(() -> { + blueprintExtenderService.destroyContainer(bundle, blueprintExtenderService.getContainer(bundle)); + blueprintExtenderService.createContainer(bundle, paths); + }); + } + @Override public void restartContainerAndDependents(final Bundle bundle) { - LOG.debug("restartContainerAndDependents for bundle {}", bundle); + if (restartExecutor.isShutdown()) { + return; + } - restartExecutor.execute(new Runnable() { - @Override - public void run() { - restartContainerAndDependentsInternal(bundle); + LOG.debug("restartContainerAndDependents for bundle {}", bundle); - } - }); + restartExecutor.execute(() -> restartContainerAndDependentsInternal(bundle)); } - private void restartContainerAndDependentsInternal(Bundle forBundle) { + private void restartContainerAndDependentsInternal(final Bundle forBundle) { + requireNonNull(blueprintExtenderService); + requireNonNull(quiesceParticipant); + + // We use a LinkedHashSet to preserve insertion order as we walk the service usage hierarchy. Set containerBundlesSet = new LinkedHashSet<>(); findDependentContainersRecursively(forBundle, containerBundlesSet); @@ -57,20 +97,103 @@ class BlueprintContainerRestartServiceImpl implements AutoCloseable, BlueprintCo LOG.info("Restarting blueprint containers for bundle {} and its dependent bundles {}", forBundle, containerBundles.subList(1, containerBundles.size())); - // Destroy the containers in reverse order with 'forBundle' last, ie bottom-up in the service tree. - for(int i = containerBundles.size() - 1; i >= 0; i--) { - Bundle bundle = containerBundles.get(i); - blueprintExtenderService.destroyContainer(bundle, blueprintExtenderService.getContainer(bundle)); + // The blueprint containers are created asynchronously so we register a handler for blueprint events + // that are sent when a container is complete, successful or not. The CountDownLatch tells when all + // containers are complete. This is done to ensure all blueprint containers are finished before we + // restart config modules. + final CountDownLatch containerCreationComplete = new CountDownLatch(containerBundles.size()); + ServiceRegistration eventHandlerReg = registerEventHandler(forBundle.getBundleContext(), event -> { + final Bundle bundle = event.getBundle(); + if (event.isReplay()) { + LOG.trace("Got replay BlueprintEvent {} for bundle {}", event.getType(), bundle); + return; + } + + LOG.debug("Got BlueprintEvent {} for bundle {}", event.getType(), bundle); + if (containerBundles.contains(bundle) + && (event.getType() == BlueprintEvent.CREATED || event.getType() == BlueprintEvent.FAILURE)) { + containerCreationComplete.countDown(); + LOG.debug("containerCreationComplete is now {}", containerCreationComplete.getCount()); + } + }); + + final Runnable createContainerCallback = () -> createContainers(containerBundles); + + // Destroy the container down-top recursively and once done, restart the container top-down + destroyContainers(new ArrayDeque<>(Lists.reverse(containerBundles)), createContainerCallback); + + + try { + if (!containerCreationComplete.await(CONTAINER_CREATE_TIMEOUT_IN_MINUTES, TimeUnit.MINUTES)) { + LOG.warn("Failed to restart all blueprint containers within {} minutes. Attempted to restart {} {} " + + "but only {} completed restart", CONTAINER_CREATE_TIMEOUT_IN_MINUTES, containerBundles.size(), + containerBundles, containerBundles.size() - containerCreationComplete.getCount()); + return; + } + } catch (final InterruptedException e) { + LOG.debug("CountDownLatch await was interrupted - returning"); + return; } - // Restart the containers top-down starting with 'forBundle'. - for(Bundle bundle: containerBundles) { + AriesFrameworkUtil.safeUnregisterService(eventHandlerReg); + + LOG.info("Finished restarting blueprint containers for bundle {} and its dependent bundles", forBundle); + } + + /** + * Recursively quiesce and destroy the bundles one by one in order to maintain synchronicity and ordering. + * @param remainingBundlesToDestroy the list of remaining bundles to destroy. + * @param createContainerCallback a {@link Runnable} to {@code run()} when the recursive function is completed. + */ + private void destroyContainers(final Deque remainingBundlesToDestroy, + final Runnable createContainerCallback) { + + final Bundle nextBundle; + synchronized (remainingBundlesToDestroy) { + if (remainingBundlesToDestroy.isEmpty()) { + LOG.debug("All blueprint containers were quiesced and destroyed"); + createContainerCallback.run(); + return; + } + + nextBundle = remainingBundlesToDestroy.poll(); + } + + // The Quiesce capability is a like a soft-stop, clean-stop. In the case of the Blueprint extender, in flight + // service calls are allowed to finish; they're counted in and counted out, and no new calls are allowed. When + // there are no in flight service calls, the bundle is told to stop. The Blueprint bundle itself doesn't know + // this is happening which is a key design point. In the case of Blueprint, the extender ensures no new Entity + // Managers(EMs) are created. Then when all those EMs are closed the quiesce operation reports that it is + // finished. + // To properly restart the blueprint containers, first we have to quiesce the list of bundles, and once done, it + // is safe to destroy their BlueprintContainer, so no reference is retained. + // + // Mail - thread explaining Quiesce API: + // https://www.mail-archive.com/dev@aries.apache.org/msg08403.html + + // Quiesced the bundle to unregister the associated BlueprintContainer + quiesceParticipant.quiesce(bundlesQuiesced -> { + + // Destroy the container once Quiesced + Arrays.stream(bundlesQuiesced).forEach(quiescedBundle -> { + LOG.debug("Quiesced bundle {}", quiescedBundle); + blueprintExtenderService.destroyContainer( + quiescedBundle, blueprintExtenderService.getContainer(quiescedBundle)); + }); + + destroyContainers(remainingBundlesToDestroy, createContainerCallback); + + }, Collections.singletonList(nextBundle)); + } + + private void createContainers(final List containerBundles) { + containerBundles.forEach(bundle -> { List paths = BlueprintBundleTracker.findBlueprintPaths(bundle); LOG.info("Restarting blueprint container for bundle {} with paths {}", bundle, paths); blueprintExtenderService.createContainer(bundle, paths); - } + }); } /** @@ -80,19 +203,19 @@ class BlueprintContainerRestartServiceImpl implements AutoCloseable, BlueprintCo * @param bundle the bundle to traverse * @param containerBundles the current set of bundles containing blueprint containers */ - private void findDependentContainersRecursively(Bundle bundle, Set containerBundles) { - if(containerBundles.contains(bundle)) { + private void findDependentContainersRecursively(final Bundle bundle, final Set containerBundles) { + if (!containerBundles.add(bundle)) { + // Already seen this bundle... return; } - containerBundles.add(bundle); ServiceReference[] references = bundle.getRegisteredServices(); if (references != null) { for (ServiceReference reference : references) { Bundle[] usingBundles = reference.getUsingBundles(); - if(usingBundles != null) { - for(Bundle usingBundle: usingBundles) { - if(blueprintExtenderService.getContainer(usingBundle) != null) { + if (usingBundles != null) { + for (Bundle usingBundle : usingBundles) { + if (blueprintExtenderService.getContainer(usingBundle) != null) { findDependentContainersRecursively(usingBundle, containerBundles); } } @@ -101,8 +224,15 @@ class BlueprintContainerRestartServiceImpl implements AutoCloseable, BlueprintCo } } + private static ServiceRegistration registerEventHandler(final BundleContext bundleContext, + final BlueprintListener listener) { + return bundleContext.registerService(BlueprintListener.class.getName(), listener, new Hashtable<>()); + } + @Override public void close() { + LOG.debug("Closing"); + restartExecutor.shutdownNow(); } }