BGPCEP-739: Fix "raced with transaction PingPongTransaction" 16/67016/20
authorClaudio D. Gasparini <claudio.gasparini@pantheon.tech>
Thu, 25 Jan 2018 10:31:32 +0000 (11:31 +0100)
committerClaudio D. Gasparini <claudio.gasparini@pantheon.tech>
Thu, 25 Jan 2018 15:07:01 +0000 (16:07 +0100)
under TopologyNodeState. Raced call to putTopologyNode
will cause an override node and therefore PathComputationClient
was being removed and ending on exception when trying to update
it after such change.

Change-Id: I1d5b44c383d7159669bec7475afff3713f1370a0
Signed-off-by: Claudio D. Gasparini <claudio.gasparini@pantheon.tech>
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractTopologySessionListener.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/PCEPTopologyProvider.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/ServerSessionManager.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/Stateful07TopologySessionListener.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/TopologyNodeState.java
pcep/topology/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/config/PCEPTopologyProviderBean.java
pcep/topology/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractPCEPSessionTest.java

index b43b1c01473ed483a9c7a5e8761df3d4056b5737..fbbc337a83a421a2db843e09a6d050404eb7f37b 100755 (executable)
@@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.bgpcep.pcep.topology.provider.session.stats.SessionStateImpl;
 import org.opendaylight.bgpcep.pcep.topology.provider.session.stats.TopologySessionStats;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.protocol.pcep.PCEPCloseTermination;
@@ -168,43 +167,19 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
             pccBuilder.setReportedLsp(initialNodeState.getAugmentation(Node1.class)
                     .getPathComputationClient().getReportedLsp());
         }
-        writeNode(pccBuilder, state, topologyAugment);
+        state.storeNode(topologyAugment,
+                new Node1Builder().setPathComputationClient(pccBuilder.build()).build(), this.session);
         this.listenerState.init(psession);
         LOG.info("Session with {} attached to topology node {}", psession.getRemoteAddress(), state.getNodeId());
     }
 
-    private void writeNode(final PathComputationClientBuilder pccBuilder, final TopologyNodeState state,
-            final InstanceIdentifier<Node1> topologyAugment) {
-        final Node1 ta = new Node1Builder().setPathComputationClient(pccBuilder.build()).build();
-
-        final ReadWriteTransaction trans = state.rwTransaction();
-        trans.put(LogicalDatastoreType.OPERATIONAL, topologyAugment, ta);
-        LOG.trace("Peer data {} set to {}", topologyAugment, ta);
-
-        // All set, commit the modifications
-        Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                LOG.trace("Internal state for session {} updated successfully",
-                        AbstractTopologySessionListener.this.session);
-            }
-
-            @Override
-            public void onFailure(final Throwable throwable) {
-                LOG.error("Failed to update internal state for session {}, terminating it",
-                        AbstractTopologySessionListener.this.session, throwable);
-                AbstractTopologySessionListener.this.session.close(TerminationReason.UNKNOWN);
-            }
-        }, MoreExecutors.directExecutor());
-    }
-
     synchronized void updatePccState(final PccSyncState pccSyncState) {
         if (this.nodeState == null) {
             LOG.info("Server Session Manager is closed.");
             AbstractTopologySessionListener.this.session.close(TerminationReason.UNKNOWN);
             return;
         }
-        final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
+        final MessageContext ctx = new MessageContext(this.nodeState.getChain().newWriteOnlyTransaction());
         updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(pccSyncState).build());
         if (pccSyncState != PccSyncState.Synchronized) {
             this.synced.set(false);
@@ -214,13 +189,13 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
         Futures.addCallback(ctx.trans.submit(), new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
-                LOG.trace("Internal state for session {} updated successfully",
+                LOG.trace("Pcc Internal state for session {} updated successfully",
                         AbstractTopologySessionListener.this.session);
             }
 
             @Override
             public void onFailure(final Throwable throwable) {
-                LOG.error("Failed to update internal state for session {}",
+                LOG.error("Failed to update Pcc internal state for session {}",
                         AbstractTopologySessionListener.this.session, throwable);
                 AbstractTopologySessionListener.this.session.close(TerminationReason.UNKNOWN);
             }
@@ -298,7 +273,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
             psession.close(TerminationReason.UNKNOWN);
             return;
         }
-        final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
+        final MessageContext ctx = new MessageContext(this.nodeState.getChain().newWriteOnlyTransaction());
 
         if (onMessage(ctx, message)) {
             LOG.warn("Unhandled message {} on session {}", message, psession);
index 66c62ee8da3738034b11cfd9906253db92f11904..8279b469be9a96fa300e1acd7b8f81ed2f626673 100755 (executable)
@@ -82,7 +82,7 @@ public final class PCEPTopologyProvider extends DefaultTopologyReference {
                         new TopologyProgramming(this.scheduler, this.manager)));
         this.network.registerPath(NetworkTopologyContext.class, this.configDependencies.getTopology());
 
-        this.manager.instantiateServiceInstance().get();
+        this.manager.instantiateServiceInstance();
         final ChannelFuture channelFuture = this.dependenciesProvider.getPCEPDispatcher()
                 .createServer(this.manager.getPCEPDispatcherDependencies());
         channelFuture.get();
index cf92e0386c6c4fb50f8b1b56f18b814f843e8787..56f8f6c014434068f62b54c4ac99d57dab2a19fa 100755 (executable)
@@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.concurrent.GuardedBy;
@@ -98,7 +99,7 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
     /**
      * Create Base Topology.
      */
-    synchronized ListenableFuture<Void> instantiateServiceInstance() {
+    synchronized void instantiateServiceInstance() {
         final TopologyKey key = InstanceIdentifier.keyOf(this.topology);
         final TopologyId topologyId = key.getTopologyId();
         final WriteTransaction tx = this.dependenciesProvider.getDataBroker().newWriteOnlyTransaction();
@@ -107,21 +108,14 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
                         .addAugmentation(TopologyTypes1.class, new TopologyTypes1Builder().setTopologyPcep(
                                 new TopologyPcepBuilder().build()).build()).build())
                 .setNode(new ArrayList<>()).build(), true);
-        final ListenableFuture<Void> future = tx.submit();
-        Futures.addCallback(future, new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                LOG.debug("PCEP Topology {} created successfully.", topologyId.getValue());
-                ServerSessionManager.this.isClosed.set(false);
-            }
-
-            @Override
-            public void onFailure(final Throwable throwable) {
-                LOG.error("Failed to create PCEP Topology {}.", topologyId.getValue(), throwable);
-                ServerSessionManager.this.isClosed.set(true);
-            }
-        }, MoreExecutors.directExecutor());
-        return future;
+        try {
+            tx.submit().get();
+            LOG.info("PCEP Topology {} created successfully.", topologyId.getValue());
+            ServerSessionManager.this.isClosed.set(false);
+        } catch (final ExecutionException | InterruptedException throwable) {
+            LOG.error("Failed to create PCEP Topology {}.", topologyId.getValue(), throwable);
+            ServerSessionManager.this.isClosed.set(true);
+        }
     }
 
     synchronized void releaseNodeState(final TopologyNodeState nodeState, final PCEPSession session,
index d5d36e43b7e4487d56aa21a25e37187384973bbb..0b92bb0a29259b149da1f7380cdc9ab7310bfcc0 100644 (file)
@@ -364,7 +364,7 @@ class Stateful07TopologySessionListener extends AbstractTopologySessionListener<
     }
 
     @Override
-    protected boolean onMessage(final MessageContext ctx, final Message message) {
+    protected synchronized boolean onMessage(final MessageContext ctx, final Message message) {
         if (message instanceof PcerrMessage) {
             return handleErrorMessage((PcerrMessage) message);
         }
index f0d28066c4d772af5c99d299477bf1c3666ab1c2..41b9286ab4e7aff2d648e987b19667e32b8a39cc 100644 (file)
@@ -16,17 +16,21 @@ import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.protocol.pcep.PCEPSession;
+import org.opendaylight.protocol.pcep.TerminationReason;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev171025.Node1;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev171025.lsp.metadata.Metadata;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
@@ -48,6 +52,7 @@ final class TopologyNodeState implements AutoCloseable, TransactionChainListener
     private final long holdStateNanos;
     private long lastReleased = 0;
     //cache initial node state, if any node was persisted
+    @GuardedBy("this")
     private Node initialNodeState = null;
 
     TopologyNodeState(final DataBroker broker, final InstanceIdentifier<Topology> topology, final NodeId id,
@@ -83,7 +88,7 @@ final class TopologyNodeState implements AutoCloseable, TransactionChainListener
         // The session went down. Undo all the Topology changes we have done.
         // We might want to persist topology node for later re-use.
         if (!persist) {
-            final WriteTransaction trans = beginTransaction();
+            final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
             trans.delete(LogicalDatastoreType.OPERATIONAL, this.nodeId);
             Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
                 @Override
@@ -111,22 +116,17 @@ final class TopologyNodeState implements AutoCloseable, TransactionChainListener
 
         //try to get the topology's node
         if (retrieveNode) {
-            Futures.addCallback(readOperationalData(this.nodeId), new FutureCallback<Optional<Node>>() {
-                @Override
-                public void onSuccess(@Nonnull final Optional<Node> result) {
-                    if (!result.isPresent()) {
-                        putTopologyNode();
-                    } else {
-                        //cache retrieved node
-                        TopologyNodeState.this.initialNodeState = result.get();
-                    }
-                }
-
-                @Override
-                public void onFailure(final Throwable throwable) {
-                    LOG.error("Failed to get topology node {}", TopologyNodeState.this.nodeId, throwable);
+            try {
+                final Optional<Node> prevNode = readOperationalData(this.nodeId).get();
+                if (!prevNode.isPresent()) {
+                    putTopologyNode();
+                } else {
+                    //cache retrieved node
+                    TopologyNodeState.this.initialNodeState = prevNode.get();
                 }
-            }, MoreExecutors.directExecutor());
+            } catch (final ExecutionException | InterruptedException throwable) {
+                LOG.error("Failed to get topology node {}", TopologyNodeState.this.nodeId, throwable);
+            }
         } else {
             putTopologyNode();
         }
@@ -136,15 +136,12 @@ final class TopologyNodeState implements AutoCloseable, TransactionChainListener
         return this.initialNodeState;
     }
 
-    WriteTransaction beginTransaction() {
-        return this.chain.newWriteOnlyTransaction();
-    }
-
-    ReadWriteTransaction rwTransaction() {
-        return this.chain.newReadWriteTransaction();
+    synchronized BindingTransactionChain getChain() {
+        return this.chain;
     }
 
-    <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(final InstanceIdentifier<T> id) {
+    synchronized <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(
+            final InstanceIdentifier<T> id) {
         try (ReadOnlyTransaction t = this.chain.newReadOnlyTransaction()) {
             return t.read(LogicalDatastoreType.OPERATIONAL, id);
         }
@@ -164,16 +161,48 @@ final class TopologyNodeState implements AutoCloseable, TransactionChainListener
     }
 
     @Override
-    public void close() {
+    public synchronized void close() {
         this.chain.close();
     }
 
-    private void putTopologyNode() {
+    private synchronized void putTopologyNode() {
         final Node node = new NodeBuilder().setKey(this.nodeId.getKey())
                 .setNodeId(this.nodeId.getKey().getNodeId()).build();
-        final WriteTransaction t = beginTransaction();
-        t.put(LogicalDatastoreType.OPERATIONAL, this.nodeId, node);
-        t.submit();
+        final WriteTransaction t = this.chain.newWriteOnlyTransaction();
+        LOG.trace("Put topology Node {}, value {}", this.nodeId, node);
+        t.merge(LogicalDatastoreType.OPERATIONAL, this.nodeId, node);
+        Futures.addCallback(t.submit(), new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.trace("Topology Node stored {}, value {}", TopologyNodeState.this.nodeId, node);
+            }
+
+            @Override
+            public void onFailure(final Throwable throwable) {
+                LOG.trace("Put topology Node failed {}, value {}, {}", TopologyNodeState.this.nodeId, node, throwable);
+            }
+        }, MoreExecutors.directExecutor());
     }
 
+    public synchronized void storeNode(final InstanceIdentifier<Node1> topologyAugment, final Node1 ta,
+            final PCEPSession session) {
+        LOG.trace("Peer data {} set to {}", topologyAugment, ta);
+        final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
+        trans.put(LogicalDatastoreType.OPERATIONAL, topologyAugment, ta);
+
+        // All set, commit the modifications
+        Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.trace("Node stored {} for session {} updated successfully", topologyAugment, session);
+            }
+
+            @Override
+            public void onFailure(final Throwable throwable) {
+                LOG.error("Failed to store node {} for session {}, terminating it",
+                        topologyAugment, session, throwable);
+                session.close(TerminationReason.UNKNOWN);
+            }
+        }, MoreExecutors.directExecutor());
+    }
 }
\ No newline at end of file
index ef4ddb24984228f89b06e4dc10372476413970e8..949e240dc88fa0f53d105e5a213f8de08293ef1d 100644 (file)
@@ -153,20 +153,18 @@ public final class PCEPTopologyProviderBean implements PCEPTopologyProviderDepen
         @SuppressWarnings("checkstyle:IllegalCatch")
         public synchronized void instantiateServiceInstance() {
             LOG.info("PCEP Topology Provider Singleton Service {} instantiated", getIdentifier().getValue());
-            if (this.pcepTopoProvider != null) {
-                try {
-                    this.pcepTopoProvider.instantiateServiceInstance();
-                } catch (final Exception e) {
-                    LOG.error("Failed to instantiate PCEP Topology provider", e);
-                }
-                this.serviceInstantiated = true;
+            try {
+                this.pcepTopoProvider.instantiateServiceInstance();
+            } catch (final Exception e) {
+                LOG.error("Failed to instantiate PCEP Topology provider", e);
             }
+            this.serviceInstantiated = true;
         }
 
         @Override
         public synchronized ListenableFuture<Void> closeServiceInstance() {
             LOG.info("Close PCEP Topology Provider Singleton Service {}", getIdentifier().getValue());
-            if (this.pcepTopoProvider != null && this.serviceInstantiated) {
+            if (this.serviceInstantiated) {
                 this.serviceInstantiated = false;
                 return this.pcepTopoProvider.closeServiceInstance();
             }
index c666d195048c325ce6581997b3388699032be10d..8333e30a5a2d605347deca0fdc12433ffd1c6c95 100644 (file)
@@ -16,7 +16,6 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.opendaylight.protocol.util.CheckUtil.checkEquals;
 
-import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
@@ -161,8 +160,7 @@ public abstract class AbstractPCEPSessionTest<T extends TopologySessionListenerF
     }
 
     void startSessionManager() throws Exception {
-        final ListenableFuture<Void> future = this.manager.instantiateServiceInstance();
-        future.get();
+        this.manager.instantiateServiceInstance();
         checkEquals(() -> assertFalse(this.manager.isClosed.get()));
     }