BUG-7768 Synchronize ServerSessionManager for PCEP 05/51705/1
authorKevin Wang <kevixw@gmail.com>
Thu, 9 Feb 2017 22:40:23 +0000 (14:40 -0800)
committerKevin Wang <kevixw@gmail.com>
Sat, 11 Feb 2017 00:23:22 +0000 (16:23 -0800)
Add the check so that session manager can only be closed once
and becomes unusable after closing.

A java.util.ConcurrentModificationException is possible to happen
due to the close() method of ServerSessionManager is not synchronized.
The situation mostly likely will happen during a controller reboot.
It may also happen when a PCEP config is updated. The ServerSessionManager
is being restarted while all the PCEP negotiation is still happening.

Change-Id: I0e0d387add046a4fe9ec5ac74a85f8350e238d60
Signed-off-by: Kevin Wang <kevixw@gmail.com>
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractTopologySessionListener.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/PCEPTopologyProvider.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/ServerSessionManager.java

index 6cda92f8cb336d78cca18918d12960015acb198f..59c58a55f12c8eabe3bdcee35804cae02784373b 100755 (executable)
@@ -23,7 +23,6 @@ import java.util.Map.Entry;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
-
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeMXBean;
 import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeRegistration;
@@ -146,6 +145,11 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
         syncOptimization  = new SyncOptimization(session);
 
         final TopologyNodeState state = this.serverSessionManager.takeNodeState(peerAddress, this, isLspDbRetreived());
+        // takeNodeState(..) may fail when the server session manager is being restarted due to configuration change
+        if (state == null) {
+            this.onSessionDown(session, new RuntimeException("Unable to fetch topology node state for PCEP session with " + session.getRemoteAddress()));
+            return;
+        }
 
         this.session = session;
         this.nodeState = state;
@@ -169,9 +173,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements PCEPSessi
         }
         writeNode(pccBuilder, state, topologyAugment);
         this.listenerState.init(session);
-        if (this.serverSessionManager.getRuntimeRootRegistration().isPresent()) {
-            this.registration = this.serverSessionManager.getRuntimeRootRegistration().get().register(this);
-        }
+        this.serverSessionManager.registerRuntimeRootRegistration(this);
         LOG.info("Session with {} attached to topology node {}", session.getRemoteAddress(), state.getNodeId());
     }
 
index 1b307ebff1202ef5ecd47bb23db6720f79548c15..e64a05b59a7700aefdb875e409087d2491eb5262 100755 (executable)
@@ -70,7 +70,7 @@ public final class PCEPTopologyProvider extends DefaultTopologyReference impleme
 
         final ServerSessionManager manager = new ServerSessionManager(dataBroker, topology, listenerFactory, rpcTimeout);
         if (runtimeRootRegistrator.isPresent()) {
-            manager.registerRuntimeRootRegistartion(runtimeRootRegistrator.get());
+            manager.setRuntimeRootRegistartion(runtimeRootRegistrator.get());
         }
         final ChannelFuture f = dispatcher.createServer(address, keys, manager, manager);
         f.get();
@@ -96,17 +96,17 @@ public final class PCEPTopologyProvider extends DefaultTopologyReference impleme
         }
 
         try {
-            PCEPTopologyProvider.this.network.close();
+            this.network.close();
         } catch (final Exception e) {
             LOG.error("Failed to unregister network-level RPCs", e);
         }
         try {
-            PCEPTopologyProvider.this.element.close();
+            this.element.close();
         } catch (final Exception e) {
             LOG.error("Failed to unregister element-level RPCs", e);
         }
         try {
-            PCEPTopologyProvider.this.manager.close();
+            this.manager.close();
         } catch (final Exception e) {
             LOG.error("Failed to shutdown session manager", e);
         }
index d9ba62f6cf3327a879d1f843d446f0509e010d66..92b49e0b715dc2a3b2d1ac62d0e47a1168f8cb5d 100755 (executable)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.bgpcep.pcep.topology.provider;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.net.InetAddress;
@@ -16,6 +15,10 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.config.yang.pcep.topology.provider.ListenerStateRuntimeMXBean;
 import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopologyProviderRuntimeMXBean;
 import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopologyProviderRuntimeRegistration;
 import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopologyProviderRuntimeRegistrator;
@@ -42,7 +45,6 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypesBuilder;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
@@ -55,14 +57,17 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, AutoClos
     private static final Logger LOG = LoggerFactory.getLogger(ServerSessionManager.class);
     private static final long DEFAULT_HOLD_STATE_NANOS = TimeUnit.MINUTES.toNanos(5);
 
+    @GuardedBy("this")
     private final Map<NodeId, TopologySessionListener> nodes = new HashMap<>();
+    @GuardedBy("this")
     private final Map<NodeId, TopologyNodeState> state = new HashMap<>();
     private final TopologySessionListenerFactory listenerFactory;
     private final InstanceIdentifier<Topology> topology;
     private final DataBroker broker;
     private final PCEPStatefulPeerProposal peerProposal;
-    private Optional<PCEPTopologyProviderRuntimeRegistration> runtimeRootRegistration = Optional.absent();
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
     private final int rpcTimeout;
+    private final AtomicReference<PCEPTopologyProviderRuntimeRegistration> runtimeRootRegistration = new AtomicReference<>();
 
     public ServerSessionManager(final DataBroker broker, final InstanceIdentifier<Topology> topology,
             final TopologySessionListenerFactory listenerFactory, final int rpcTimeout) throws ReadFailedException, TransactionCommitFailedException {
@@ -78,7 +83,7 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, AutoClos
         tx.put(LogicalDatastoreType.OPERATIONAL, topology, new TopologyBuilder().setKey(k).setTopologyId(k.getTopologyId()).setTopologyTypes(
                 new TopologyTypesBuilder().addAugmentation(TopologyTypes1.class,
                         new TopologyTypes1Builder().setTopologyPcep(new TopologyPcepBuilder().build()).build()).build()).setNode(
-                                new ArrayList<Node>()).build(), true);
+                new ArrayList<>()).build(), true);
         tx.submit().checkedGet();
     }
 
@@ -94,6 +99,10 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, AutoClos
 
     synchronized TopologyNodeState takeNodeState(final InetAddress address, final TopologySessionListener sessionListener, final boolean retrieveNode) {
         final NodeId id = createNodeId(address);
+        if (this.isClosed.get()) {
+            LOG.error("Server Session Manager is closed. Unable to create topology node {} with listener {}", id, sessionListener);
+            return null;
+        }
 
         LOG.debug("Node {} requested by listener {}", id, sessionListener);
         TopologyNodeState ret = this.state.get(id);
@@ -157,27 +166,40 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, AutoClos
     }
 
     @Override
-    public void close() throws TransactionCommitFailedException {
-        if (this.runtimeRootRegistration.isPresent()) {
-            this.runtimeRootRegistration.get().close();
+    public synchronized void close() throws TransactionCommitFailedException {
+        if (this.isClosed.getAndSet(true)) {
+            LOG.error("Session Manager has already been closed.");
+            return;
+        }
+        final PCEPTopologyProviderRuntimeRegistration runtimeReg = this.runtimeRootRegistration.getAndSet(null);
+        if (runtimeReg != null) {
+            runtimeReg.close();
         }
         for (final TopologySessionListener sessionListener : this.nodes.values()) {
             sessionListener.close();
         }
+        this.nodes.clear();
         for (final TopologyNodeState nodeState : this.state.values()) {
             nodeState.close();
         }
+        this.state.clear();
         final WriteTransaction t = this.broker.newWriteOnlyTransaction();
         t.delete(LogicalDatastoreType.OPERATIONAL, this.topology);
         t.submit().checkedGet();
     }
 
-    public void registerRuntimeRootRegistartion(final PCEPTopologyProviderRuntimeRegistrator runtimeRootRegistrator) {
-        this.runtimeRootRegistration = Optional.of(runtimeRootRegistrator.register(this));
+    public void setRuntimeRootRegistartion(final PCEPTopologyProviderRuntimeRegistrator runtimeRootRegistrator) {
+        if (!this.runtimeRootRegistration.compareAndSet(null, runtimeRootRegistrator.register(this))) {
+            LOG.error("Runtime root registration has been set before.");
+        }
     }
 
-    public Optional<PCEPTopologyProviderRuntimeRegistration> getRuntimeRootRegistration() {
-        return this.runtimeRootRegistration;
+    public void registerRuntimeRootRegistration(final ListenerStateRuntimeMXBean bean) {
+        final PCEPTopologyProviderRuntimeRegistration runtimeReg = this.runtimeRootRegistration.get();
+        if (runtimeReg != null) {
+            runtimeReg.register(bean);
+            LOG.trace("Bean {} is successfully registered.", bean.getPeerId());
+        }
     }
 
     @Override