X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-topology-singleton%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftopology%2Fsingleton%2Fimpl%2FNetconfTopologyManager.java;h=49160193b4fb5c17c947946e17daa178abcfbc23;hb=f0b0a99508a36b2087b507ad1ab976255599f4af;hp=86b7111f78ce325bfd0b6ee6285f08df3ee30922;hpb=0a95f4298a7f467b6c2eebf7904c8253cf3d5198;p=netconf.git diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManager.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManager.java index 86b7111f78..49160193b4 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManager.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManager.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.topology.singleton.impl; import akka.actor.ActorSystem; @@ -13,27 +12,29 @@ import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.netty.util.concurrent.EventExecutor; import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.opendaylight.aaa.encrypt.AAAEncryptionService; import org.opendaylight.controller.cluster.ActorSystemProvider; import org.opendaylight.controller.config.threadpool.ScheduledThreadPool; import org.opendaylight.controller.config.threadpool.ThreadPool; -import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener; -import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.DataObjectModification; -import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier; -import org.opendaylight.controller.md.sal.binding.api.DataTreeModification; -import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; -import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.DataObjectModification; +import org.opendaylight.mdsal.binding.api.DataTreeIdentifier; +import org.opendaylight.mdsal.binding.api.DataTreeModification; +import org.opendaylight.mdsal.binding.api.WriteTransaction; +import org.opendaylight.mdsal.common.api.CommitInfo; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMMountPointService; +import org.opendaylight.mdsal.dom.api.DOMRpcProviderService; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration; import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; @@ -68,10 +69,10 @@ public class NetconfTopologyManager clusterRegistrations = new ConcurrentHashMap<>(); private final DataBroker dataBroker; - private final RpcProviderRegistry rpcProviderRegistry; + private final DOMRpcProviderService rpcProviderRegistry; private final ClusterSingletonServiceProvider clusterSingletonServiceProvider; - private final ScheduledThreadPool keepaliveExecutor; - private final ThreadPool processingExecutor; + private final ScheduledExecutorService keepaliveExecutor; + private final ListeningExecutorService processingExecutor; private final ActorSystem actorSystem; private final EventExecutor eventExecutor; private final NetconfClientDispatcher clientDispatcher; @@ -83,7 +84,7 @@ public class NetconfTopologyManager private String privateKeyPath; private String privateKeyPassphrase; - public NetconfTopologyManager(final DataBroker dataBroker, final RpcProviderRegistry rpcProviderRegistry, + public NetconfTopologyManager(final DataBroker dataBroker, final DOMRpcProviderService rpcProviderRegistry, final ClusterSingletonServiceProvider clusterSingletonServiceProvider, final ScheduledThreadPool keepaliveExecutor, final ThreadPool processingExecutor, final ActorSystemProvider actorSystemProvider, @@ -95,8 +96,8 @@ public class NetconfTopologyManager this.dataBroker = Preconditions.checkNotNull(dataBroker); this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry); this.clusterSingletonServiceProvider = Preconditions.checkNotNull(clusterSingletonServiceProvider); - this.keepaliveExecutor = Preconditions.checkNotNull(keepaliveExecutor); - this.processingExecutor = Preconditions.checkNotNull(processingExecutor); + this.keepaliveExecutor = keepaliveExecutor.getExecutor(); + this.processingExecutor = MoreExecutors.listeningDecorator(processingExecutor.getExecutor()); this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem(); this.eventExecutor = Preconditions.checkNotNull(eventExecutor); this.clientDispatcher = Preconditions.checkNotNull(clientDispatcher); @@ -197,8 +198,8 @@ public class NetconfTopologyManager } @VisibleForTesting - protected NetconfTopologyContext newNetconfTopologyContext(NetconfTopologySetup setup, - ServiceGroupIdentifier serviceGroupIdent, Timeout actorResponseWaitTime) { + protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup, + final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime) { return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService); } @@ -209,17 +210,15 @@ public class NetconfTopologyManager dataChangeListenerRegistration = null; } - contexts.values().forEach(netconfTopologyContext -> close(netconfTopologyContext)); - - clusterRegistrations.values().forEach( - clusterSingletonServiceRegistration -> close(clusterSingletonServiceRegistration)); + contexts.values().forEach(NetconfTopologyManager::close); + clusterRegistrations.values().forEach(NetconfTopologyManager::close); contexts.clear(); clusterRegistrations.clear(); } @SuppressWarnings("checkstyle:IllegalCatch") - private void close(AutoCloseable closeable) { + private static void close(final AutoCloseable closeable) { try { closeable.close(); } catch (Exception e) { @@ -245,22 +244,21 @@ public class NetconfTopologyManager final WriteTransaction wtx = dataBroker.newWriteOnlyTransaction(); initTopology(wtx, LogicalDatastoreType.CONFIGURATION); initTopology(wtx, LogicalDatastoreType.OPERATIONAL); - Futures.addCallback(wtx.submit(), new FutureCallback() { + wtx.commit().addCallback(new FutureCallback() { @Override - public void onSuccess(final Void result) { + public void onSuccess(final CommitInfo result) { LOG.debug("topology initialization successful"); } @Override public void onFailure(@Nonnull final Throwable throwable) { - LOG.error("Unable to initialize netconf-topology, {}", throwable); + LOG.error("Unable to initialize netconf-topology", throwable); } }, MoreExecutors.directExecutor()); LOG.debug("Registering datastore listener"); - return dataBroker.registerDataTreeChangeListener( - new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, - NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this); + return dataBroker.registerDataTreeChangeListener(DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION, + NetconfTopologyUtils.createTopologyListPath(topologyId).child(Node.class)), this); } private void initTopology(final WriteTransaction wtx, final LogicalDatastoreType datastoreType) {