+ systemReadyMonitor.registerListener(this);
+ LOG.info("OpenFlowPluginProvider started, waiting for onSystemBootReady()");
+ }
+
+ @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE,
+ policy = ReferencePolicy.DYNAMIC, policyOption = ReferencePolicyOption.GREEDY)
+ public synchronized void bindConnectionProvider(final SwitchConnectionProvider switchConnectionProvider) {
+ connectionProviders.add(switchConnectionProvider);
+ LOG.info("Added connection provider {}", switchConnectionProvider);
+
+ if (startedProviders != null) {
+ LOG.info("Starting latecomer connection provider {}", switchConnectionProvider);
+ startingProviders += 1;
+ startProvider(switchConnectionProvider);
+ }
+ }
+
+ public synchronized void unbindConnectionProvider(final SwitchConnectionProvider switchConnectionProvider) {
+ connectionProviders.remove(switchConnectionProvider);
+ if (startedProviders != null && startedProviders.remove(switchConnectionProvider)) {
+ switchConnectionProvider.shutdown();
+ }
+ LOG.info("Removed connection provider {}", switchConnectionProvider);
+ }
+
+ private ListenableFuture<Void> startProvider(final SwitchConnectionProvider provider) {
+ // Inject OpenFlowPlugin custom serializers and deserializers into OpenFlowJava
+ if (config.getUseSingleLayerSerialization()) {
+ SerializerInjector.injectSerializers(provider, provider.getConfiguration().isGroupAddModEnabled());
+ DeserializerInjector.injectDeserializers(provider);
+ } else {
+ DeserializerInjector.revertDeserializers(provider);
+ }
+
+ // Set handler of incoming connections and start switch connection provider
+ final var future = provider.startup(connectionManager);
+ startedProviders.add(provider);
+ Futures.addCallback(future, new FutureCallback<>() {
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.info("Connection provider {} started", provider);
+ connectionStarted();
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ LOG.warn("Connection provider {} failed to start", provider, cause);
+ connectionFailed(cause);
+ }
+ }, MoreExecutors.directExecutor());
+ return future;
+ }
+
+ @Override
+ public synchronized void onSystemBootReady() {
+ LOG.info("onSystemBootReady() received, starting the switch connections");
+
+ final var size = connectionProviders.size();
+ startedProviders = new ArrayList<>(size);
+ startingProviders = size;
+ connectionProviders.forEach(this::startProvider);
+ }
+
+ private synchronized void connectionFailed(final Throwable cause) {
+ // Decrement below zero, so we do not arrive to zero
+ startingProviders = -1;
+ diagStatusProvider.reportStatus(ServiceState.ERROR, cause);
+ }
+
+ private synchronized void connectionStarted() {
+ if (--startingProviders == 0 && startedProviders.equals(connectionProviders)) {
+ LOG.info("All switchConnectionProviders are up and running ({}).", startedProviders.size());
+ diagStatusProvider.reportStatus(ServiceState.OPERATIONAL);
+ }
+ }
+
+ private ListenableFuture<List<Void>> shutdownSwitchConnections() {
+ final var future = Futures.allAsList(startedProviders.stream()
+ .map(switchConnectionProvider -> {
+ // Revert deserializers to their original state
+ if (config.getUseSingleLayerSerialization()) {
+ DeserializerInjector.revertDeserializers(switchConnectionProvider);
+ }
+
+ // Shutdown switch connection provider
+ return switchConnectionProvider.shutdown();
+ }).collect(Collectors.toList()));
+ startedProviders.clear();
+
+ Futures.addCallback(future, new FutureCallback<>() {
+ @Override
+ public void onSuccess(final List<Void> result) {
+ LOG.info("All switchConnectionProviders were successfully shut down ({}).", result.size());
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ LOG.warn("Some switchConnectionProviders failed to shutdown.", throwable);
+ }
+ }, MoreExecutors.directExecutor());
+
+ return future;