BUG-6975: Integrate Topology Provider with CS Service 61/52761/15
authorClaudio D. Gasparini <claudio.gasparini@pantheon.tech>
Sat, 11 Mar 2017 22:28:26 +0000 (23:28 +0100)
committerClaudio D. Gasparini <claudio.gasparini@pantheon.tech>
Thu, 16 Mar 2017 13:39:35 +0000 (13:39 +0000)
Integrate Topology Provider with CS Service

Change-Id: I0e9c42aa3538e0936b74120535fee369db1ad9d5
Signed-off-by: Claudio D. Gasparini <claudio.gasparini@pantheon.tech>
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractTopologySessionListener.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/PCEPTopologyProvider.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/ServerSessionManager.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/config/PCEPTopologyConfigDependencies.java [new file with mode: 0644]
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/config/PCEPTopologyDeployerImpl.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/config/PCEPTopologyProviderBean.java
pcep/topology-provider/src/main/resources/org/opendaylight/blueprint/pcep-topology.xml
pcep/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractPCEPSessionTest.java
pcep/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/Stateful07TopologySessionListenerTest.java

index ecc5bdf839ea02fb2f2ae139e8d35b3aad6dfafe..b44bdf77489f6a72c047001ce4c3bb10f6347799 100755 (executable)
@@ -346,7 +346,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
         this.listenerState.updateStatefulSentMsg(message);
         final PCEPRequest req = new PCEPRequest(metadata);
         this.requests.put(requestId, req);
-        final int rpcTimeout = this.serverSessionManager.getRpcTimeout();
+        final short rpcTimeout = this.serverSessionManager.getRpcTimeout();
         LOG.trace("RPC response timeout value is {} seconds", rpcTimeout);
         if (rpcTimeout > 0) {
             setupTimeoutHandler(requestId, req, rpcTimeout);
@@ -368,7 +368,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
         return req.getFuture();
     }
 
-    private void setupTimeoutHandler(final S requestId, final PCEPRequest req, final int timeout) {
+    private void setupTimeoutHandler(final S requestId, final PCEPRequest req, final short timeout) {
         final Timer timer = req.getTimer();
         timer.schedule(new TimerTask() {
             @Override
index d15704f1f604e599f7367e35f30247747e08abbc..71741df9f21ae5ff305795c207a6f60ab90143e3 100755 (executable)
@@ -7,17 +7,22 @@
  */
 package org.opendaylight.bgpcep.pcep.topology.provider;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import java.net.InetSocketAddress;
 import java.util.List;
+import org.opendaylight.bgpcep.pcep.topology.provider.config.PCEPTopologyConfigDependencies;
 import org.opendaylight.bgpcep.pcep.topology.provider.config.PCEPTopologyProviderDependenciesProvider;
 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
 import org.opendaylight.bgpcep.topology.DefaultTopologyReference;
 import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopologyProviderRuntimeRegistrator;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.protocol.concepts.KeyMapping;
 import org.opendaylight.protocol.pcep.PCEPCapability;
@@ -25,40 +30,29 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.network.
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.programming.rev131106.NetworkTopologyPcepProgrammingService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.NetworkTopologyPcepService;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.osgi.framework.ServiceRegistration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class PCEPTopologyProvider extends DefaultTopologyReference implements AutoCloseable {
-
+public final class PCEPTopologyProvider extends DefaultTopologyReference {
     private static final Logger LOG = LoggerFactory.getLogger(PCEPTopologyProvider.class);
 
     private static final String STATEFUL_NOT_DEFINED = "Stateful capability not defined, aborting PCEP Topology" +
         " Provider instantiation";
-    private final BindingAwareBroker.RoutedRpcRegistration<NetworkTopologyPcepProgrammingService> network;
-    private final BindingAwareBroker.RoutedRpcRegistration<NetworkTopologyPcepService> element;
+    private final InstanceIdentifier<Topology> topology;
     private final ServerSessionManager manager;
-    private final Channel channel;
-    private ServiceRegistration<?> serviceRegistration;
-
-    private PCEPTopologyProvider(final Channel channel, final InstanceIdentifier<Topology> topology, final ServerSessionManager manager,
-            final BindingAwareBroker.RoutedRpcRegistration<NetworkTopologyPcepService> element,
-            final BindingAwareBroker.RoutedRpcRegistration<NetworkTopologyPcepProgrammingService> network) {
-        super(topology);
-        this.channel = Preconditions.checkNotNull(channel);
-        this.manager = Preconditions.checkNotNull(manager);
-        this.element = Preconditions.checkNotNull(element);
-        this.network = Preconditions.checkNotNull(network);
-    }
+    private final InetSocketAddress address;
+    private final Optional<KeyMapping> keys;
+    private final InstructionScheduler scheduler;
+    private final PCEPTopologyProviderDependenciesProvider dependenciesProvider;
+    private RoutedRpcRegistration<NetworkTopologyPcepProgrammingService> network;
+    private RoutedRpcRegistration<NetworkTopologyPcepService> element;
+    private Channel channel;
 
     public static PCEPTopologyProvider create(final PCEPTopologyProviderDependenciesProvider dependenciesProvider,
-        final InetSocketAddress address, final Optional<KeyMapping> keys, final InstructionScheduler scheduler,
-        final TopologyId topologyId, final Optional<PCEPTopologyProviderRuntimeRegistrator> runtimeRootRegistrator,
-        final int rpcTimeout) throws Exception {
+        final PCEPTopologyConfigDependencies configDependencies) throws Exception {
         final List<PCEPCapability> capabilities = dependenciesProvider.getPCEPDispatcher()
             .getPCEPSessionNegotiatorFactory().getPCEPSessionProposalFactory().getCapabilities();
         boolean statefulCapability = false;
@@ -75,59 +69,67 @@ public final class PCEPTopologyProvider extends DefaultTopologyReference impleme
         }
 
         final InstanceIdentifier<Topology> topology = InstanceIdentifier.builder(NetworkTopology.class)
-            .child(Topology.class, new TopologyKey(topologyId)).build();
+            .child(Topology.class, new TopologyKey(configDependencies.getTopologyId())).build();
         final ServerSessionManager manager = new ServerSessionManager(dependenciesProvider.getDataBroker(), topology,
-            listenerFactory, rpcTimeout);
-        if(runtimeRootRegistrator.isPresent()){
-            manager.setRuntimeRootRegistrator(runtimeRootRegistrator.get());
+            listenerFactory, configDependencies.getRpcTimeout());
+        final Optional<PCEPTopologyProviderRuntimeRegistrator> runtime = configDependencies.getRuntimeRootRegistrator();
+        if(runtime.isPresent()){
+            manager.setRuntimeRootRegistrator(runtime.get());
         }
-        final ChannelFuture f = dependenciesProvider.getPCEPDispatcher().createServer(address, keys, manager, manager);
-        f.get();
-
-        final RpcProviderRegistry rpcRegistry = dependenciesProvider.getRpcProviderRegistry();
-        final BindingAwareBroker.RoutedRpcRegistration<NetworkTopologyPcepService> element = rpcRegistry
-            .addRoutedRpcImplementation(NetworkTopologyPcepService.class, new TopologyRPCs(manager));
-        element.registerPath(NetworkTopologyContext.class, topology);
 
-        final BindingAwareBroker.RoutedRpcRegistration<NetworkTopologyPcepProgrammingService> network = rpcRegistry
-            .addRoutedRpcImplementation(NetworkTopologyPcepProgrammingService.class,
-                new TopologyProgramming(scheduler, manager));
-        network.registerPath(NetworkTopologyContext.class, topology);
+        return new PCEPTopologyProvider(configDependencies.getAddress(), configDependencies.getKeys(),
+            dependenciesProvider, topology, manager,  configDependencies.getSchedulerDependency());
+    }
 
-        return new PCEPTopologyProvider(f.channel(), topology, manager, element, network);
+    private PCEPTopologyProvider(final InetSocketAddress address, final Optional<KeyMapping> keys,
+        final PCEPTopologyProviderDependenciesProvider dependenciesProvider,
+        final InstanceIdentifier<Topology> topology, final ServerSessionManager manager,
+        final InstructionScheduler scheduler) {
+        super(topology);
+        this.dependenciesProvider = Preconditions.checkNotNull(dependenciesProvider);
+        this.address = address;
+        this.topology = Preconditions.checkNotNull(topology);
+        this.keys = keys;
+        this.manager = Preconditions.checkNotNull(manager);
+        this.scheduler = scheduler;
     }
 
-    @Override
-    public void close() {
+    public void instantiateServiceInstance() {
+        final RpcProviderRegistry rpcRegistry = this.dependenciesProvider.getRpcProviderRegistry();
+
+        this.element = Preconditions.checkNotNull(rpcRegistry
+            .addRoutedRpcImplementation(NetworkTopologyPcepService.class, new TopologyRPCs(this.manager)));
+        this.element.registerPath(NetworkTopologyContext.class, this.topology);
+
+        this.network = Preconditions.checkNotNull(rpcRegistry
+            .addRoutedRpcImplementation(NetworkTopologyPcepProgrammingService.class,
+                new TopologyProgramming(this.scheduler, this.manager)));
+        this.network.registerPath(NetworkTopologyContext.class, this.topology);
         try {
-            this.channel.close().sync();
-            LOG.debug("Server channel {} closed", this.channel);
-        } catch (final InterruptedException e) {
-            LOG.error("Failed to close channel {}", this.channel, e);
+            this.manager.instantiateServiceInstance().checkedGet();
+            final ChannelFuture channelFuture = this.dependenciesProvider.getPCEPDispatcher()
+                .createServer(this.address, this.keys, this.manager, this.manager);
+            channelFuture.get();
+            this.channel = channelFuture.channel();
+        } catch (final Exception e) {
+            LOG.error("Failed to instantiate PCEP Topology provider", e);
         }
 
-        try {
+    }
+
+    public ListenableFuture<Void> closeServiceInstance() {
+        //FIXME return also channelClose once ListenableFuture implements wildcard
+        this.channel.close().addListener((ChannelFutureListener) future ->
+            checkArgument(future.isSuccess(), "Channel failed to close: %s", future.cause()));
+
+        if (this.network != null) {
             this.network.close();
-        } catch (final Exception e) {
-            LOG.error("Failed to unregister network-level RPCs", e);
+            this.network = null;
         }
-        try {
+        if (this.element != null) {
             this.element.close();
-        } catch (final Exception e) {
-            LOG.error("Failed to unregister element-level RPCs", e);
-        }
-        try {
-            this.manager.close();
-        } catch (final Exception e) {
-            LOG.error("Failed to shutdown session manager", e);
+            this.element = null;
         }
-        if (this.serviceRegistration != null) {
-            this.serviceRegistration.unregister();
-            this.serviceRegistration = null;
-        }
-    }
-
-    public synchronized void setServiceRegistration(final ServiceRegistration<?> serviceRegistration) {
-        this.serviceRegistration = serviceRegistration;
+        return this.manager.closeServiceInstance();
     }
-}
+}
\ No newline at end of file
index 4bf1bc4e872c4e208679c88f3640176badde6aed..34d109907c408c8d82a256460d8c5bc3c0b9d485 100755 (executable)
@@ -8,6 +8,9 @@
 package org.opendaylight.bgpcep.pcep.topology.provider;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -26,7 +29,6 @@ import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopolo
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.protocol.pcep.PCEPPeerProposal;
 import org.opendaylight.protocol.pcep.PCEPSession;
@@ -43,6 +45,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.UpdateLspArgs;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.topology.pcep.type.TopologyPcepBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
@@ -54,7 +57,8 @@ import org.slf4j.LoggerFactory;
 /**
  *
  */
-final class ServerSessionManager implements PCEPSessionListenerFactory, AutoCloseable, TopologySessionRPCs, PCEPTopologyProviderRuntimeMXBean, PCEPPeerProposal {
+final class ServerSessionManager implements PCEPSessionListenerFactory, TopologySessionRPCs,
+    PCEPTopologyProviderRuntimeMXBean, PCEPPeerProposal {
     private static final Logger LOG = LoggerFactory.getLogger(ServerSessionManager.class);
     private static final long DEFAULT_HOLD_STATE_NANOS = TimeUnit.MINUTES.toNanos(5);
 
@@ -67,25 +71,45 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, AutoClos
     private final DataBroker broker;
     private final PCEPStatefulPeerProposal peerProposal;
     private final AtomicBoolean isClosed = new AtomicBoolean(false);
-    private final int rpcTimeout;
+    private final short rpcTimeout;
     private final AtomicReference<PCEPTopologyProviderRuntimeRegistration> runtimeRootRegistration = new AtomicReference<>();
 
     public ServerSessionManager(final DataBroker broker, final InstanceIdentifier<Topology> topology,
-            final TopologySessionListenerFactory listenerFactory, final int rpcTimeout) throws ReadFailedException, TransactionCommitFailedException {
+        final TopologySessionListenerFactory listenerFactory, final short rpcTimeout) {
         this.broker = Preconditions.checkNotNull(broker);
         this.topology = Preconditions.checkNotNull(topology);
         this.listenerFactory = Preconditions.checkNotNull(listenerFactory);
         this.peerProposal = PCEPStatefulPeerProposal.createStatefulPeerProposal(this.broker, this.topology);
         this.rpcTimeout = rpcTimeout;
+    }
 
-        // Now create the base topology
-        final TopologyKey k = InstanceIdentifier.keyOf(topology);
-        final WriteTransaction tx = broker.newWriteOnlyTransaction();
-        tx.put(LogicalDatastoreType.OPERATIONAL, topology, new TopologyBuilder().setKey(k).setTopologyId(k.getTopologyId()).setTopologyTypes(
-                new TopologyTypesBuilder().addAugmentation(TopologyTypes1.class,
-                        new TopologyTypes1Builder().setTopologyPcep(new TopologyPcepBuilder().build()).build()).build()).setNode(
-                new ArrayList<>()).build(), true);
-        tx.submit().checkedGet();
+    /**
+     * Create Base Topology
+     *
+     * @throws TransactionCommitFailedException exception
+     */
+    synchronized CheckedFuture<Void, TransactionCommitFailedException> instantiateServiceInstance() {
+        final TopologyKey key = InstanceIdentifier.keyOf(this.topology);
+        final TopologyId topologyId = key.getTopologyId();
+        final WriteTransaction tx = this.broker.newWriteOnlyTransaction();
+        tx.put(LogicalDatastoreType.OPERATIONAL, this.topology, new TopologyBuilder().setKey(key)
+            .setTopologyId(topologyId).setTopologyTypes(new TopologyTypesBuilder()
+                .addAugmentation(TopologyTypes1.class, new TopologyTypes1Builder().setTopologyPcep(
+                    new TopologyPcepBuilder().build()).build()).build())
+            .setNode(new ArrayList<>()).build(), true);
+        final CheckedFuture<Void, TransactionCommitFailedException> future = tx.submit();
+        Futures.addCallback(future, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.debug("PCEP Topology {} created successfully.", topologyId.getValue());
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                LOG.error("Failed to create PCEP Topology {}.", topologyId.getValue(), t);
+            }
+        });
+        return future;
     }
 
     private static NodeId createNodeId(final InetAddress addr) {
@@ -168,11 +192,10 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, AutoClos
         return (l != null) ? l.triggerSync(input) : OperationResults.UNSENT.future();
     }
 
-    @Override
-    public synchronized void close() throws TransactionCommitFailedException {
+    synchronized ListenableFuture<Void> closeServiceInstance() {
         if (this.isClosed.getAndSet(true)) {
             LOG.error("Session Manager has already been closed.");
-            return;
+            Futures.immediateFuture(null);
         }
         final PCEPTopologyProviderRuntimeRegistration runtimeReg = this.runtimeRootRegistration.getAndSet(null);
         if (runtimeReg != null) {
@@ -188,7 +211,19 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, AutoClos
         this.state.clear();
         final WriteTransaction t = this.broker.newWriteOnlyTransaction();
         t.delete(LogicalDatastoreType.OPERATIONAL, this.topology);
-        t.submit().checkedGet();
+        final CheckedFuture<Void, TransactionCommitFailedException> future = t.submit();
+        Futures.addCallback(future, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.debug("Topology {} removed", ServerSessionManager.this.topology);
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                LOG.warn("Failed to remove Topology {}", ServerSessionManager.this.topology, t);
+            }
+        });
+        return future;
     }
 
     synchronized void setRuntimeRootRegistrator(final PCEPTopologyProviderRuntimeRegistrator runtimeRootRegistrator) {
@@ -213,7 +248,7 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, AutoClos
         this.peerProposal.setPeerProposal(createNodeId(address.getAddress()), openBuilder);
     }
 
-    public int getRpcTimeout() {
+    short getRpcTimeout() {
         return this.rpcTimeout;
     }
-}
+}
\ No newline at end of file
diff --git a/pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/config/PCEPTopologyConfigDependencies.java b/pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/config/PCEPTopologyConfigDependencies.java
new file mode 100644 (file)
index 0000000..daf54ec
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.bgpcep.pcep.topology.provider.config;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Optional;
+import java.net.InetSocketAddress;
+import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopologyProviderRuntimeRegistrator;
+import org.opendaylight.protocol.concepts.KeyMapping;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+
+public final class PCEPTopologyConfigDependencies {
+    private final InetSocketAddress address;
+    private final Optional<KeyMapping> keys;
+    private final InstructionScheduler scheduler;
+    private final TopologyId topologyId;
+    private final Optional<PCEPTopologyProviderRuntimeRegistrator> runtime;
+    private final short rpcTimeout;
+
+    public PCEPTopologyConfigDependencies(final InetSocketAddress address, final Optional<KeyMapping> keys,
+        final InstructionScheduler scheduler, final TopologyId topologyId,
+        final Optional<PCEPTopologyProviderRuntimeRegistrator> runtime, final short rpcTimeout) {
+        this.address = checkNotNull(address);
+        this.keys = checkNotNull(keys);
+        this.scheduler = checkNotNull(scheduler);
+        this.topologyId = checkNotNull(topologyId);
+        this.runtime = checkNotNull(runtime);
+        this.rpcTimeout = rpcTimeout;
+    }
+
+    public TopologyId getTopologyId() {
+        return this.topologyId;
+    }
+
+    public InstructionScheduler getSchedulerDependency() {
+        return this.scheduler;
+    }
+
+    public short getRpcTimeout() {
+        return this.rpcTimeout;
+    }
+
+    public Optional<PCEPTopologyProviderRuntimeRegistrator> getRuntimeRootRegistrator() {
+        return this.runtime;
+    }
+
+    public InetSocketAddress getAddress() {
+        return this.address;
+    }
+
+    public Optional<KeyMapping> getKeys() {
+        return this.keys;
+    }
+}
index 78587a4f0007bce962781108e34e62ba8873c4f7..7794bc54088d7a0f4f38e8de356432ade1c90f65 100644 (file)
@@ -44,8 +44,9 @@ public class PCEPTopologyDeployerImpl implements PCEPTopologyDeployer, AutoClose
         final PCEPTopologyProviderBean PCEPTopologyProviderBean = (PCEPTopologyProviderBean) this.container
             .getComponentInstance(PCEPTopologyProviderBean.class.getSimpleName());
         this.pcepTopologyServices.put(topologyId, PCEPTopologyProviderBean);
-        PCEPTopologyProviderBean.start(inetSocketAddress, keys, schedulerDependency, topologyId,
-            runtime, rpcTimeout);
+        final PCEPTopologyConfigDependencies configDependencies = new PCEPTopologyConfigDependencies(inetSocketAddress,
+            keys, schedulerDependency, topologyId, runtime, rpcTimeout);
+        PCEPTopologyProviderBean.start(configDependencies);
     }
 
     @Override
index 3077d23782f996847a84c5d82b01b69a419a80c0..cc43d1ce64da174cff5ed3566f0f0f1cdded5701 100644 (file)
@@ -7,23 +7,25 @@
  */
 package org.opendaylight.bgpcep.pcep.topology.provider.config;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import java.net.InetSocketAddress;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Dictionary;
 import java.util.Hashtable;
 import java.util.List;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.bgpcep.pcep.topology.provider.PCEPTopologyProvider;
 import org.opendaylight.bgpcep.pcep.topology.provider.TopologySessionListenerFactory;
-import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
 import org.opendaylight.bgpcep.topology.DefaultTopologyReference;
-import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopologyProviderRuntimeRegistrator;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.protocol.concepts.KeyMapping;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
 import org.opendaylight.protocol.pcep.PCEPCapability;
 import org.opendaylight.protocol.pcep.PCEPDispatcher;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
 import org.slf4j.Logger;
@@ -39,11 +41,13 @@ public final class PCEPTopologyProviderBean implements PCEPTopologyProviderDepen
     private final TopologySessionListenerFactory sessionListenerFactory;
     private final RpcProviderRegistry rpcProviderRegistry;
     private final BundleContext bundleContext;
-    private PCEPTopologyProvider pcepTopoProvider;
+    private final ClusterSingletonServiceProvider cssp;
+    private PCEPTopologyProviderBeanCSS pcepTopoProviderCSS;
 
-    public PCEPTopologyProviderBean(final BundleContext bundleContext, final DataBroker dataBroker,
-        final PCEPDispatcher pcepDispatcher, final RpcProviderRegistry rpcProviderRegistry,
+    public PCEPTopologyProviderBean(final ClusterSingletonServiceProvider cssp, final BundleContext bundleContext,
+        final DataBroker dataBroker, final PCEPDispatcher pcepDispatcher, final RpcProviderRegistry rpcProviderRegistry,
         final TopologySessionListenerFactory sessionListenerFactory) {
+        this.cssp = Preconditions.checkNotNull(cssp);
         this.bundleContext = Preconditions.checkNotNull(bundleContext);
         this.pcepDispatcher = Preconditions.checkNotNull(pcepDispatcher);
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
@@ -59,27 +63,18 @@ public final class PCEPTopologyProviderBean implements PCEPTopologyProviderDepen
 
     @Override
     public void close() {
-        if (this.pcepTopoProvider != null) {
-            this.pcepTopoProvider.close();
+        if (this.pcepTopoProviderCSS != null) {
+            this.pcepTopoProviderCSS.close();
         }
     }
 
-    public void start(final InetSocketAddress inetSocketAddress, final Optional<KeyMapping> keys,
-        final InstructionScheduler schedulerDependency, final TopologyId topologyId,
-        final Optional<PCEPTopologyProviderRuntimeRegistrator> runtime, final short rpcTimeout) {
-        Preconditions.checkState(this.pcepTopoProvider == null,
+    public void start(final PCEPTopologyConfigDependencies configDependencies) {
+        Preconditions.checkState(this.pcepTopoProviderCSS == null,
             "Previous instance %s was not closed.", this);
         try {
-            this.pcepTopoProvider = PCEPTopologyProvider.create(this,
-                inetSocketAddress, keys, schedulerDependency, topologyId, runtime, rpcTimeout);
-
-            final Dictionary<String, String> properties = new Hashtable<>();
-            properties.put(PCEPTopologyProvider.class.getName(), topologyId.getValue());
-            final ServiceRegistration<?> serviceRegistration = this.bundleContext
-                .registerService(DefaultTopologyReference.class.getName(), this.pcepTopoProvider, properties);
-            this.pcepTopoProvider.setServiceRegistration(serviceRegistration);
+            this.pcepTopoProviderCSS = new PCEPTopologyProviderBeanCSS(configDependencies);
         } catch (final Exception e) {
-            LOG.debug("Failed to create PCEPTopologyProvider {}", topologyId.getValue());
+            LOG.debug("Failed to create PCEPTopologyProvider {}", configDependencies.getTopologyId().getValue());
         }
     }
 
@@ -102,4 +97,66 @@ public final class PCEPTopologyProviderBean implements PCEPTopologyProviderDepen
     public TopologySessionListenerFactory getTopologySessionListenerFactory() {
         return this.sessionListenerFactory;
     }
+
+    private class PCEPTopologyProviderBeanCSS implements ClusterSingletonService, AutoCloseable {
+        private final ServiceGroupIdentifier sgi;
+        private ServiceRegistration<?> serviceRegistration;
+        private ClusterSingletonServiceRegistration cssRegistration;
+        private final PCEPTopologyProvider pcepTopoProvider;
+        @GuardedBy("this")
+        private boolean serviceInstantiated;
+
+        PCEPTopologyProviderBeanCSS(final PCEPTopologyConfigDependencies configDependencies) throws Exception {
+                this.sgi = configDependencies.getSchedulerDependency().getIdentifier();
+                this.pcepTopoProvider = PCEPTopologyProvider.create(PCEPTopologyProviderBean.this, configDependencies);
+
+                final Dictionary<String, String> properties = new Hashtable<>();
+                properties.put(PCEPTopologyProvider.class.getName(), configDependencies.getTopologyId().getValue());
+                this.serviceRegistration = PCEPTopologyProviderBean.this.bundleContext
+                    .registerService(DefaultTopologyReference.class.getName(), this.pcepTopoProvider, properties);
+            LOG.info("PCEP Topology Provider service {} registered", getIdentifier().getValue());
+            this.cssRegistration = PCEPTopologyProviderBean.this.cssp.registerClusterSingletonService(this);
+        }
+
+        @Override
+        public synchronized void instantiateServiceInstance() {
+            LOG.info("Topology Provider Singleton Service {} instantiated", getIdentifier().getValue());
+            if (this.pcepTopoProvider != null) {
+                this.pcepTopoProvider.instantiateServiceInstance();
+                this.serviceInstantiated = true;
+            }
+        }
+
+        @Override
+        public synchronized ListenableFuture<Void> closeServiceInstance() {
+            LOG.info("Close Topology Provider Singleton Service {}", getIdentifier().getValue());
+            if (this.pcepTopoProvider != null && this.serviceInstantiated) {
+                this.serviceInstantiated = false;
+                return this.pcepTopoProvider.closeServiceInstance();
+            }
+            return Futures.immediateFuture(null);
+        }
+
+        @Nonnull
+        @Override
+        public ServiceGroupIdentifier getIdentifier() {
+            return this.sgi;
+        }
+
+        @Override
+        public void close() {
+            if (this.cssRegistration != null) {
+                try {
+                    this.cssRegistration.close();
+                } catch (final Exception e) {
+                    LOG.debug("Failed to close PCEP Topology Provider service {}", this.sgi.getValue(), e);
+                }
+                this.cssRegistration = null;
+            }
+            if (this.serviceRegistration != null) {
+                this.serviceRegistration.unregister();
+                this.serviceRegistration = null;
+            }
+        }
+    }
 }
index 8d80176a3598c5fcebe3eb5e8aefabdd511d216c..85af8773076a193d2a2ca803a7ee084a7245ae5c 100644 (file)
@@ -19,6 +19,8 @@
     <reference id="dataBroker" interface="org.opendaylight.controller.md.sal.binding.api.DataBroker" odl:type="pingpong"/>
     <reference id="rpcRegistry" interface="org.opendaylight.controller.sal.binding.api.RpcProviderRegistry"/>
     <reference id="pcepDispatcher" interface="org.opendaylight.protocol.pcep.PCEPDispatcher"/>
+    <reference id="clusterSingletonServiceProvider"
+               interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"/>
 
     <bean id="pcepTopologyDeployer" class="org.opendaylight.bgpcep.pcep.topology.provider.config.PCEPTopologyDeployerImpl"
           destroy-method="close">
@@ -28,6 +30,7 @@
 
     <bean id="PCEPTopologyProviderBean" class="org.opendaylight.bgpcep.pcep.topology.provider.config.PCEPTopologyProviderBean"
           scope="prototype">
+        <argument ref="clusterSingletonServiceProvider"/>
         <argument ref="blueprintBundleContext"/>
         <argument ref="dataBroker"/>
         <argument ref="pcepDispatcher"/>
index 564aa9b2416539e482b66de990e460d881e0c0eb..e564d59f3841accc90c4343d8d87d9629de9a598 100644 (file)
@@ -66,25 +66,27 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.Notification;
 
-public abstract class AbstractPCEPSessionTest<T extends TopologySessionListenerFactory> extends AbstractConcurrentDataBrokerTest {
+public abstract class AbstractPCEPSessionTest<T extends TopologySessionListenerFactory>
+    extends AbstractConcurrentDataBrokerTest {
 
-    protected static final String TEST_TOPOLOGY_NAME = "testtopo";
+    private static final String TEST_TOPOLOGY_NAME = "testtopo";
     static final InstanceIdentifier<Topology> TOPO_IID = InstanceIdentifier.builder(NetworkTopology.class).child(
             Topology.class, new TopologyKey(new TopologyId(TEST_TOPOLOGY_NAME))).build();
-    protected static final String IPV4_MASK = "/32";
-    protected static final short DEAD_TIMER = 30;
-    protected static final short KEEP_ALIVE = 10;
-    protected static final int RPC_TIMEOUT = 4;
-
-    protected final String testAddress = InetSocketAddressUtil.getRandomLoopbackIpAddress();
-    protected final NodeId nodeId = new NodeId("pcc://" + this.testAddress);
-    protected final InstanceIdentifier<PathComputationClient> pathComputationClientIId = TOPO_IID.builder().child(Node.class,
-        new NodeKey(this.nodeId)).augmentation(Node1.class).child(PathComputationClient.class).build();
-    protected final String eroIpPrefix = this.testAddress + IPV4_MASK;
-    protected final String newDestinationAddress = InetSocketAddressUtil.getRandomLoopbackIpAddress();
-    protected final String dstIpPrefix = this.newDestinationAddress + IPV4_MASK;
-
-    protected List<Notification> receivedMsgs;
+    private static final String IPV4_MASK = "/32";
+    static final short DEAD_TIMER = 30;
+    static final short KEEP_ALIVE = 10;
+    static final short RPC_TIMEOUT = 4;
+
+    final String testAddress = InetSocketAddressUtil.getRandomLoopbackIpAddress();
+    final NodeId nodeId = new NodeId("pcc://" + this.testAddress);
+    protected final InstanceIdentifier<PathComputationClient> pathComputationClientIId = TOPO_IID.builder()
+        .child(Node.class, new NodeKey(this.nodeId)).augmentation(Node1.class).child(PathComputationClient.class
+        ).build();
+    final String eroIpPrefix = this.testAddress + IPV4_MASK;
+    final String newDestinationAddress = InetSocketAddressUtil.getRandomLoopbackIpAddress();
+    final String dstIpPrefix = this.newDestinationAddress + IPV4_MASK;
+
+    List<Notification> receivedMsgs;
 
     @Mock
     private EventLoop eventLoop;
@@ -99,17 +101,16 @@ public abstract class AbstractPCEPSessionTest<T extends TopologySessionListenerF
     private ChannelFuture channelFuture;
 
     @Mock
-    protected ListenerStateRuntimeRegistration listenerReg;
+    ListenerStateRuntimeRegistration listenerReg;
 
-    private T listenerFactory;
-
-    private final Open localPrefs = new OpenBuilder().setDeadTimer((short) 30).setKeepalive((short) 10).setSessionId((short) 0).build();
+    private final Open localPrefs = new OpenBuilder().setDeadTimer((short) 30).setKeepalive((short) 10)
+        .setSessionId((short) 0).build();
 
     private final Open remotePrefs = this.localPrefs;
 
-    protected ServerSessionManager manager;
+    ServerSessionManager manager;
 
-    protected NetworkTopologyPcepService topologyRpcs;
+    NetworkTopologyPcepService topologyRpcs;
 
     private DefaultPCEPSessionNegotiator neg;
 
@@ -125,9 +126,11 @@ public abstract class AbstractPCEPSessionTest<T extends TopologySessionListenerF
         doReturn(null).when(this.channelFuture).addListener(Mockito.any());
         doReturn("TestingChannel").when(this.clientListener).toString();
         doReturn(this.pipeline).when(this.clientListener).pipeline();
-        doReturn(this.pipeline).when(this.pipeline).replace(any(ChannelHandler.class), any(String.class), any(ChannelHandler.class));
+        doReturn(this.pipeline).when(this.pipeline).replace(any(ChannelHandler.class), any(String.class),
+            any(ChannelHandler.class));
         doReturn(this.eventLoop).when(this.clientListener).eventLoop();
-        doReturn(null).when(this.eventLoop).schedule(any(Runnable.class), any(long.class), any(TimeUnit.class));
+        doReturn(null).when(this.eventLoop).schedule(any(Runnable.class), any(long.class),
+            any(TimeUnit.class));
         doReturn(true).when(this.clientListener).isActive();
         final SocketAddress ra = new InetSocketAddress(this.testAddress, 4189);
         doReturn(ra).when(this.clientListener).remoteAddress();
@@ -143,20 +146,22 @@ public abstract class AbstractPCEPSessionTest<T extends TopologySessionListenerF
         final PCEPTopologyProviderRuntimeRegistrator registrator = mock(PCEPTopologyProviderRuntimeRegistrator.class);
         doReturn(topologyReg).when(registrator).register(any(PCEPTopologyProviderRuntimeMXBean.class));
 
-        this.listenerFactory = (T) ((Class) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]).newInstance();
-        this.manager = new ServerSessionManager(getDataBroker(), TOPO_IID, this.listenerFactory, RPC_TIMEOUT);
+        final T listenerFactory = (T) ((Class) ((ParameterizedType) this.getClass().getGenericSuperclass())
+            .getActualTypeArguments()[0]).newInstance();
+        this.manager = new ServerSessionManager(getDataBroker(), TOPO_IID, listenerFactory, RPC_TIMEOUT);
         this.manager.setRuntimeRootRegistrator(registrator);
-
-        this.neg = new DefaultPCEPSessionNegotiator(mock(Promise.class), this.clientListener, this.manager.getSessionListener(), (short) 1, 5, this.localPrefs);
+        this.manager.instantiateServiceInstance().checkedGet();
+        this.neg = new DefaultPCEPSessionNegotiator(mock(Promise.class), this.clientListener,
+            this.manager.getSessionListener(), (short) 1, 5, this.localPrefs);
         this.topologyRpcs = new TopologyRPCs(this.manager);
     }
 
     @After
     public void tearDown() throws TransactionCommitFailedException {
-        this.manager.close();
+        this.manager.closeServiceInstance();
     }
 
-    protected Ero createEroWithIpPrefixes(final List<String> ipPrefixes) {
+    Ero createEroWithIpPrefixes(final List<String> ipPrefixes) {
         final List<Subobject> subobjs = new ArrayList<>(ipPrefixes.size());
         final SubobjectBuilder subobjBuilder = new SubobjectBuilder();
         for (final String ipPrefix : ipPrefixes) {
@@ -167,8 +172,9 @@ public abstract class AbstractPCEPSessionTest<T extends TopologySessionListenerF
         return new EroBuilder().setSubobject(subobjs).build();
     }
 
-    protected String getLastEroIpPrefix(final Ero ero) {
-        return ((IpPrefixCase)ero.getSubobject().get(ero.getSubobject().size() - 1).getSubobjectType()).getIpPrefix().getIpPrefix().getIpv4Prefix().getValue();
+    String getLastEroIpPrefix(final Ero ero) {
+        return ((IpPrefixCase)ero.getSubobject().get(ero.getSubobject().size() - 1).getSubobjectType()).getIpPrefix()
+            .getIpPrefix().getIpv4Prefix().getValue();
     }
 
     protected Open getLocalPref() {
index 4780dc8d26af7e8196ea2d49e66e21de5ab961dd..06dc9425cdb1fabc8a06443e5df95345617d664d 100755 (executable)
@@ -323,7 +323,7 @@ public class Stateful07TopologySessionListenerTest extends AbstractPCEPSessionTe
         verify(this.listenerReg, times(0)).close();
         // send request
         final Future<RpcResult<AddLspOutput>> futureOutput = this.topologyRpcs.addLsp(createAddLspInput());
-        this.manager.close();
+        this.manager.closeServiceInstance();
         verify(this.listenerReg, times(1)).close();
         final AddLspOutput output = futureOutput.get().getResult();
         // deal with unsent request after session down
@@ -340,7 +340,7 @@ public class Stateful07TopologySessionListenerTest extends AbstractPCEPSessionTe
     @Test
     public void testOnServerSessionManagerUnstarted() throws InterruptedException, ExecutionException,
         TransactionCommitFailedException, ReadFailedException {
-        this.manager.close();
+        this.manager.closeServiceInstance();
         // the registration should not be closed since it's never initialized
         verify(this.listenerReg, times(0)).close();
         this.listener.onSessionUp(this.session);