import java.net.URI;
import java.util.Arrays;
import java.util.EnumSet;
-import java.util.List;
+import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.karaf.bundle.core.BundleService;
import org.junit.Test;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
-import org.osgi.framework.BundleListener;
-import org.osgi.framework.InvalidSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
static final String FEATURE_FILE_URI_PROP = "feature.test.file.uri";
static final String BUNDLE_CHECK_SKIP = "feature.test.bundle.check.skip";
static final String BUNDLE_CHECK_TIMEOUT_SECONDS = "feature.test.bundle.check.timeout.seconds";
+ static final String BUNDLE_CHECK_INTERVAL_SECONDS = "feature.test.bundle.check.interval.seconds";
+ static final String DEFAULT_TIMEOUT = "300";
+ static final String DEFAULT_INTERVAL = "1";
+
static final String[] ALL_PROPERTY_KEYS =
- {FEATURE_FILE_URI_PROP, BUNDLE_CHECK_SKIP, BUNDLE_CHECK_TIMEOUT_SECONDS};
+ {FEATURE_FILE_URI_PROP, BUNDLE_CHECK_SKIP, BUNDLE_CHECK_TIMEOUT_SECONDS, BUNDLE_CHECK_INTERVAL_SECONDS};
private static final Logger LOG = LoggerFactory.getLogger(TestProbe.class);
+ private static final Map<Integer, String> OSGI_STATES = Map.of(
+ Bundle.INSTALLED, "Installed", Bundle.RESOLVED, "Resolved",
+ Bundle.STARTING, "Starting", Bundle.ACTIVE, "Active",
+ Bundle.STOPPING, "Stopping", Bundle.UNINSTALLED, "Uninstalled");
private static final Map<String, BundleState> ELIGIBLE_STATES = Map.of(
"slf4j.log4j12", Installed,
"org.apache.karaf.scr.management", Waiting);
- private final Map<Long, CheckResult> bundleCheckResults = new ConcurrentHashMap<>();
- private final AtomicReference<CompletableFuture<CheckResult>> checkFutureRef = new AtomicReference<>();
+ private final Map<Long, CheckResult> bundleCheckResults = new HashMap<>();
@Inject
private BundleContext bundleContext;
}
}
- private void checkBundleStates() throws InterruptedException, ExecutionException {
+ private void checkBundleStates() throws InterruptedException {
if ("true".equals(System.getProperty(BUNDLE_CHECK_SKIP))) {
return;
}
- final int timeout = Integer.parseInt(System.getProperty(BUNDLE_CHECK_TIMEOUT_SECONDS, "600"));
- LOG.info("Checking bundle states. Timeout is {} seconds.", timeout);
-
- // start event based states collection
- final BundleListener bundleListener = event -> {
- captureBundleState(event.getBundle());
- updateCheckResults();
- };
- bundleContext.addBundleListener(bundleListener);
- // init all bundles state data
- Arrays.stream(bundleContext.getBundles()).forEach(this::captureBundleState);
- // enable stats analysis
- checkFutureRef.set(new CompletableFuture<>());
- // perform stats analysis
- updateCheckResults();
-
- final CheckResult result;
- try {
- result = checkFutureRef.get().get(timeout, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- logNokBundleDetails();
- throw new IllegalStateException("Bundles states check was not completed in " + timeout + "seconds", e);
- } finally {
- bundleContext.removeBundleListener(bundleListener);
+ final int timeout = Integer.parseInt(System.getProperty(BUNDLE_CHECK_TIMEOUT_SECONDS, DEFAULT_TIMEOUT));
+ final int interval = Integer.parseInt(System.getProperty(BUNDLE_CHECK_INTERVAL_SECONDS, DEFAULT_INTERVAL));
+ LOG.info("Checking bundle states. Interval = {} second(s). Timeout = {} second(s).", interval, timeout);
+
+ final var maxTimestamp = System.currentTimeMillis() + timeout * 1000L;
+ CheckResult result = CheckResult.IN_PROGRESS;
+ while (System.currentTimeMillis() < maxTimestamp) {
+ Arrays.stream(bundleContext.getBundles()).forEach(this::captureBundleState);
+ result = aggregatedCheckResults();
+ if (result != CheckResult.IN_PROGRESS) {
+ break;
+ }
+ Thread.sleep(interval * 1000L);
}
LOG.info("Bundle state check completed with result {}", result);
+ if (result == CheckResult.IN_PROGRESS) {
+ logNokBundleDetails();
+ throw new IllegalStateException("Bundles states check timeout");
+ }
if (result != CheckResult.SUCCESS) {
logNokBundleDetails();
- throw new IllegalStateException("Bundle states check failed");
+ throw new IllegalStateException("Bundle states check failure");
}
}
if (bundle != null) {
final var info = bundleService.getInfo(bundle);
final var checkResult = checkResultOf(info.getSymbolicName(), info.getState());
- LOG.info("Bundle state updated: {} -> {} ({})", info.getSymbolicName(), info.getState(), checkResult);
- bundleCheckResults.put(bundle.getBundleId(), checkResult);
+ if (checkResult != bundleCheckResults.get(bundle.getBundleId())) {
+ LOG.info("Bundle {} -> State: {} ({})", info.getSymbolicName(), info.getState(), checkResult);
+ bundleCheckResults.put(bundle.getBundleId(), checkResult);
+ }
}
}
- private void updateCheckResults() {
- if (checkFutureRef.get() == null || checkFutureRef.get().isDone()) {
- // don't check stats if results are not expected or already delivered
- return;
- }
+ private CheckResult aggregatedCheckResults() {
final var resultStats = bundleCheckResults.entrySet().stream()
.collect(Collectors.groupingBy(Map.Entry::getValue, Collectors.counting()));
LOG.info("Bundle states check results: total={}, byResult={}", bundleCheckResults.size(), resultStats);
if (resultStats.getOrDefault(CheckResult.FAILURE, 0L) > 0) {
- checkFutureRef.get().complete(CheckResult.FAILURE);
- } else if (resultStats.getOrDefault(CheckResult.STOPPING, 0L) > 0) {
- checkFutureRef.get().complete(CheckResult.STOPPING);
- } else if (resultStats.getOrDefault(CheckResult.IN_PROGRESS, 0L) == 0) {
- checkFutureRef.get().complete(CheckResult.SUCCESS);
+ return CheckResult.FAILURE;
+ }
+ if (resultStats.getOrDefault(CheckResult.STOPPING, 0L) > 0) {
+ return CheckResult.STOPPING;
}
+ return resultStats.getOrDefault(CheckResult.IN_PROGRESS, 0L) == 0
+ ? CheckResult.SUCCESS : CheckResult.IN_PROGRESS;
}
private void logNokBundleDetails() {
final var nokBundles = bundleCheckResults.entrySet().stream()
.filter(entry -> CheckResult.SUCCESS != entry.getValue())
.map(Map.Entry::getKey).collect(Collectors.toSet());
- // log NOK Bundles
+
for (var bundle : bundleContext.getBundles()) {
if (nokBundles.contains(bundle.getBundleId())) {
final var info = bundleService.getInfo(bundle);
- LOG.warn("NOK Bundle {} -> State: {}", info.getSymbolicName(), info.getState());
- }
- }
- // log services of NOK bundles
- try {
- for (var serviceRef : bundleContext.getAllServiceReferences(null, null)) {
- final var bundle = serviceRef.getBundle();
- if (bundle != null && nokBundles.contains(bundle.getBundleId())) {
- final var usingBundles = serviceRef.getUsingBundles();
- final var usingSymbolic = usingBundles == null ? List.of()
- : Arrays.stream(usingBundles).map(Bundle::getSymbolicName).toList();
- final var propKeys = serviceRef.getPropertyKeys();
- final var serviceProps = Arrays.stream(propKeys)
- .collect(Collectors.toMap(Function.identity(), serviceRef::getProperty));
- LOG.warn("NOK Service {} -> of bundle: {}, using: {}, props: {}",
- serviceRef.getClass().getName(), bundle.getSymbolicName(), usingSymbolic, serviceProps);
- }
+ final var diag = bundleService.getDiag(bundle);
+ final var diagText = diag.isEmpty() ? "" : ", diag: " + diag;
+ final var osgiState = OSGI_STATES.getOrDefault(bundle.getState(), "Unknown");
+ LOG.warn("NOK Bundle {}:{} -> OSGi state: {}, Karaf bundle state: {}{}",
+ info.getSymbolicName(), info.getVersion(), osgiState, info.getState(), diagText);
}
- } catch (InvalidSyntaxException e) {
- LOG.warn("Error retrieving services", e);
}
}