* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.netconf.topology.singleton.impl;
import akka.actor.ActorSystem;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.EventExecutor;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.aaa.encrypt.AAAEncryptionService;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.controller.config.threadpool.ThreadPool;
-import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
clusterRegistrations = new ConcurrentHashMap<>();
private final DataBroker dataBroker;
- private final RpcProviderRegistry rpcProviderRegistry;
+ private final DOMRpcProviderService rpcProviderRegistry;
private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
- private final ScheduledThreadPool keepaliveExecutor;
- private final ThreadPool processingExecutor;
+ private final ScheduledExecutorService keepaliveExecutor;
+ private final ListeningExecutorService processingExecutor;
private final ActorSystem actorSystem;
private final EventExecutor eventExecutor;
private final NetconfClientDispatcher clientDispatcher;
private String privateKeyPath;
private String privateKeyPassphrase;
- public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry,
+ public NetconfTopologyManager(final DataBroker dataBroker, final DOMRpcProviderService rpcProviderRegistry,
final ClusterSingletonServiceProvider clusterSingletonServiceProvider,
final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor,
final ActorSystemProvider actorSystemProvider,
this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider);
- this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor);
- this.processingExecutor = Preconditions.checkNotNull(processingExecutor);
+ this.keepaliveExecutor = keepaliveExecutor.getExecutor();
+ this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor());
this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
this.eventExecutor = Preconditions.checkNotNull(eventExecutor);
this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher);
}
@VisibleForTesting
- protected NetconfTopologyContext newNetconfTopologyContext(NetconfTopologySetup setup,
- ServiceGroupIdentifier serviceGroupIdent, Timeout actorResponseWaitTime) {
+ protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
+ final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime) {
return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService);
}
dataChangeListenerRegistration = null;
}
- contexts.values().forEach(netconfTopologyContext -> close(netconfTopologyContext));
-
- clusterRegistrations.values().forEach(
- clusterSingletonServiceRegistration -> close(clusterSingletonServiceRegistration));
+ contexts.values().forEach(NetconfTopologyManager::close);
+ clusterRegistrations.values().forEach(NetconfTopologyManager::close);
contexts.clear();
clusterRegistrations.clear();
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private void close(AutoCloseable closeable) {
+ private static void close(final AutoCloseable closeable) {
try {
closeable.close();
} catch (Exception e) {
final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction();
initTopology(wtx, LogicalDatastoreType.CONFIGURATION);
initTopology(wtx, LogicalDatastoreType.OPERATIONAL);
- Futures.addCallback(wtx.submit(), new FutureCallback<Void>() {
+ wtx.commit().addCallback(new FutureCallback<CommitInfo>() {
@Override
- public void onSuccess(final Void result) {
+ public void onSuccess(final CommitInfo result) {
LOG.debug("topology initialization successful");
}
@Override
public void onFailure(@Nonnull final Throwable throwable) {
- LOG.error("Unable to initialize netconf-topology, {}", throwable);
+ LOG.error("Unable to initialize netconf-topology", throwable);
}
}, MoreExecutors.directExecutor());
LOG.debug("Registering datastore listener");
- return dataBroker.registerDataTreeChangeListener(
- new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
- NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
+ return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION,
+ NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this);
}
private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {