import static java.util.Objects.requireNonNull;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
import org.osgi.service.component.annotations.ReferencePolicyOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final HashedWheelTimer hashedWheelTimer =
new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
private final ExtensionConverterManager extensionConverterManager;
- private final List<SwitchConnectionProvider> switchConnectionProviders;
private final DeviceInitializerProvider deviceInitializerProvider;
private final ConvertorManager convertorManager;
private final OpenflowProviderConfig config;
private final ExecutorService executorService;
private final ContextChainHolderImpl contextChainHolder;
private final DiagStatusProvider diagStatusProvider;
- private final SettableFuture<Void> fullyStarted = SettableFuture.create();
+ private final List<SwitchConnectionProvider> connectionProviders = new ArrayList<>();
+
+ private List<SwitchConnectionProvider> startedProviders;
private ConnectionManager connectionManager;
+ private int startingProviders;
@Inject
@Activate
public OpenFlowPluginProviderImpl(@Reference final ConfigurationService configurationService,
- @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policyOption = ReferencePolicyOption.GREEDY)
- final List<SwitchConnectionProvider> switchConnectionProviders,
@Reference final DataBroker dataBroker, @Reference final RpcProviderService rpcProviderRegistry,
@Reference final NotificationPublishService notificationPublishService,
@Reference final ClusterSingletonServiceProvider singletonServiceProvider,
@Reference final DiagStatusProvider diagStatusProvider,
@Reference final SystemReadyMonitor systemReadyMonitor) {
config = new OpenFlowProviderConfigImpl(configurationService);
- this.switchConnectionProviders = List.copyOf(switchConnectionProviders);
final var ppdb = new PingPongDataBroker(dataBroker);
this.diagStatusProvider = requireNonNull(diagStatusProvider);
deviceManager.setContextChainHolder(contextChainHolder);
deviceManager.initialize();
systemReadyMonitor.registerListener(this);
- LOG.info("registered onSystemBootReady() listener for OpenFlowPluginProvider");
+ LOG.info("OpenFlowPluginProvider started, waiting for onSystemBootReady()");
}
- @Override
- public void onSystemBootReady() {
- LOG.info("onSystemBootReady() received, starting the switch connections");
- Futures.addCallback(Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
- // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
- if (config.getUseSingleLayerSerialization()) {
- SerializerInjector.injectSerializers(switchConnectionProvider,
- switchConnectionProvider.getConfiguration().isGroupAddModEnabled());
- DeserializerInjector.injectDeserializers(switchConnectionProvider);
- } else {
- DeserializerInjector.revertDeserializers(switchConnectionProvider);
- }
+ @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE,
+ policy = ReferencePolicy.DYNAMIC, policyOption = ReferencePolicyOption.GREEDY)
+ public synchronized void bindConnectionProvider(final SwitchConnectionProvider switchConnectionProvider) {
+ connectionProviders.add(switchConnectionProvider);
+ LOG.info("Added connection provider {}", switchConnectionProvider);
+
+ if (startedProviders != null) {
+ LOG.info("Starting latecomer connection provider {}", switchConnectionProvider);
+ startingProviders += 1;
+ startProvider(switchConnectionProvider);
+ }
+ }
- // Set handler of incoming connections and start switch connection provider
- return switchConnectionProvider.startup(connectionManager);
- }).collect(Collectors.toSet())), new FutureCallback<List<Void>>() {
+ public synchronized void unbindConnectionProvider(final SwitchConnectionProvider switchConnectionProvider) {
+ connectionProviders.remove(switchConnectionProvider);
+ if (startedProviders != null && startedProviders.remove(switchConnectionProvider)) {
+ switchConnectionProvider.shutdown();
+ }
+ LOG.info("Removed connection provider {}", switchConnectionProvider);
+ }
+
+ private ListenableFuture<Void> startProvider(final SwitchConnectionProvider provider) {
+ // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
+ if (config.getUseSingleLayerSerialization()) {
+ SerializerInjector.injectSerializers(provider, provider.getConfiguration().isGroupAddModEnabled());
+ DeserializerInjector.injectDeserializers(provider);
+ } else {
+ DeserializerInjector.revertDeserializers(provider);
+ }
+
+ // Set handler of incoming connections and start switch connection provider
+ final var future = provider.startup(connectionManager);
+ startedProviders.add(provider);
+ Futures.addCallback(future, new FutureCallback<>() {
@Override
- public void onSuccess(final List<Void> result) {
- LOG.info("All switchConnectionProviders are up and running ({}).", result.size());
- diagStatusProvider.reportStatus(ServiceState.OPERATIONAL);
- fullyStarted.set(null);
+ public void onSuccess(final Void result) {
+ LOG.info("Connection provider {} started", provider);
+ connectionStarted();
}
@Override
- public void onFailure(final Throwable throwable) {
- LOG.warn("Some switchConnectionProviders failed to start.", throwable);
- diagStatusProvider.reportStatus(ServiceState.ERROR, throwable);
- fullyStarted.setException(throwable);
+ public void onFailure(final Throwable cause) {
+ LOG.warn("Connection provider {} failed to start", provider, cause);
+ connectionFailed(cause);
}
}, MoreExecutors.directExecutor());
+ return future;
+ }
+
+ @Override
+ public synchronized void onSystemBootReady() {
+ LOG.info("onSystemBootReady() received, starting the switch connections");
+
+ final var size = connectionProviders.size();
+ startedProviders = new ArrayList<>(size);
+ startingProviders = size;
+ connectionProviders.forEach(this::startProvider);
}
- @VisibleForTesting
- public Future<Void> getFullyStarted() {
- return fullyStarted;
+ private synchronized void connectionFailed(final Throwable cause) {
+ // Decrement below zero, so we do not arrive to zero
+ startingProviders = -1;
+ diagStatusProvider.reportStatus(ServiceState.ERROR, cause);
}
- private ListenableFuture<List<Boolean>> shutdownSwitchConnections() {
- final ListenableFuture<List<Boolean>> listListenableFuture =
- Futures.allAsList(switchConnectionProviders.stream().map(switchConnectionProvider -> {
- // Revert deserializers to their original state
- if (config.getUseSingleLayerSerialization()) {
- DeserializerInjector.revertDeserializers(switchConnectionProvider);
- }
+ private synchronized void connectionStarted() {
+ if (--startingProviders == 0 && startedProviders.equals(connectionProviders)) {
+ LOG.info("All switchConnectionProviders are up and running ({}).", startedProviders.size());
+ diagStatusProvider.reportStatus(ServiceState.OPERATIONAL);
+ }
+ }
+
+ private ListenableFuture<List<Void>> shutdownSwitchConnections() {
+ final var future = Futures.allAsList(startedProviders.stream()
+ .map(switchConnectionProvider -> {
+ // Revert deserializers to their original state
+ if (config.getUseSingleLayerSerialization()) {
+ DeserializerInjector.revertDeserializers(switchConnectionProvider);
+ }
- // Shutdown switch connection provider
- return switchConnectionProvider.shutdown();
- }).collect(Collectors.toSet()));
+ // Shutdown switch connection provider
+ return switchConnectionProvider.shutdown();
+ }).collect(Collectors.toList()));
+ startedProviders.clear();
- Futures.addCallback(listListenableFuture, new FutureCallback<List<Boolean>>() {
+ Futures.addCallback(future, new FutureCallback<>() {
@Override
- public void onSuccess(final List<Boolean> result) {
+ public void onSuccess(final List<Void> result) {
LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
}
}
}, MoreExecutors.directExecutor());
- return listListenableFuture;
+ return future;
}
@Override
@PreDestroy
@Deactivate
@SuppressWarnings("checkstyle:IllegalCatch")
- public void close() {
+ public synchronized void close() {
+ LOG.info("OpenFlowPluginProvider stopping");
try {
shutdownSwitchConnections().get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
} catch (Exception e) {
LOG.error("Failed to close ConnectionManager", e);
}
+ LOG.info("OpenFlowPluginProvider stopped");
}
@SuppressWarnings("checkstyle:IllegalCatch")
}
}
}
-
-
}