BGPCEP-739: Fix "raced with transaction PingPongTransaction" 96/67596/4
authorClaudio D. Gasparini <claudio.gasparini@pantheon.tech>
Fri, 26 Jan 2018 09:39:04 +0000 (10:39 +0100)
committerClaudio D. Gasparini <claudio.gasparini@pantheon.tech>
Mon, 26 Feb 2018 08:13:31 +0000 (09:13 +0100)
under TopologyNodeState. Raced call to putTopologyNode
will cause an override node and therefore PathComputationClient
was being removed and ending on exception when trying to update
it after such change.

Change-Id: I1d5b44c383d7159669bec7475afff3713f1370a0
Signed-off-by: Claudio D. Gasparini <claudio.gasparini@pantheon.tech>
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractTopologySessionListener.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/PCEPTopologyProvider.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/ServerSessionManager.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/Stateful07TopologySessionListener.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/TopologyNodeState.java
pcep/topology-provider/src/main/java/org/opendaylight/bgpcep/pcep/topology/provider/config/PCEPTopologyProviderBean.java
pcep/topology-provider/src/test/java/org/opendaylight/bgpcep/pcep/topology/provider/AbstractPCEPSessionTest.java

index e38b7ed6b708f1c6a2677d19ccd9e8d6b2d41f22..ea8c2f1be42bb32ba42f4c61d28020bb1fc9ae64 100755 (executable)
@@ -186,7 +186,8 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
             loadLspData(initialNodeState, this.lspData, this.lsps, isIncrementalSynchro());
             pccBuilder.setReportedLsp(initialNodeState.getAugmentation(Node1.class).getPathComputationClient().getReportedLsp());
         }
-        writeNode(pccBuilder, state, topologyAugment);
+        state.storeNode(topologyAugment,
+                new Node1Builder().setPathComputationClient(pccBuilder.build()).build(), this.session);
         register();
         if (this.registration == null) {
             LOG.error("PCEP session fails to register. Closing session {}", session);
@@ -198,36 +199,13 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
         LOG.info("Session with {} attached to topology node {}", session.getRemoteAddress(), state.getNodeId());
     }
 
-    private void writeNode(final PathComputationClientBuilder pccBuilder, final TopologyNodeState state,
-            final InstanceIdentifier<Node1> topologyAugment) {
-        final Node1 ta = new Node1Builder().setPathComputationClient(pccBuilder.build()).build();
-
-        final ReadWriteTransaction trans = state.rwTransaction();
-        trans.put(LogicalDatastoreType.OPERATIONAL, topologyAugment, ta);
-        LOG.trace("Peer data {} set to {}", topologyAugment, ta);
-
-        // All set, commit the modifications
-        Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                LOG.trace("Internal state for session {} updated successfully", AbstractTopologySessionListener.this.session);
-            }
-
-            @Override
-            public void onFailure(final Throwable t) {
-                LOG.error("Failed to update internal state for session {}, terminating it", AbstractTopologySessionListener.this.session, t);
-                AbstractTopologySessionListener.this.session.close(TerminationReason.UNKNOWN);
-            }
-        });
-    }
-
     protected void updatePccState(final PccSyncState pccSyncState) {
         if (this.nodeState == null) {
             LOG.info("Server Session Manager is closed.");
             AbstractTopologySessionListener.this.session.close(TerminationReason.UNKNOWN);
             return;
         }
-        final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
+        final MessageContext ctx = new MessageContext(this.nodeState.getChain().newWriteOnlyTransaction());
         updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(pccSyncState).build());
         if (pccSyncState != PccSyncState.Synchronized) {
             this.synced = false;
@@ -237,12 +215,12 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
         Futures.addCallback(ctx.trans.submit(), new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
-                LOG.trace("Internal state for session {} updated successfully", AbstractTopologySessionListener.this.session);
+                LOG.trace("Pcc Internal state for session {} updated successfully", AbstractTopologySessionListener.this.session);
             }
 
             @Override
             public void onFailure(final Throwable t) {
-                LOG.error("Failed to update internal state for session {}", AbstractTopologySessionListener.this.session, t);
+                LOG.error("Failed to update Pcc internal state for session {}", AbstractTopologySessionListener.this.session, t);
                 AbstractTopologySessionListener.this.session.close(TerminationReason.UNKNOWN);
             }
         });
@@ -318,7 +296,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
             session.close(TerminationReason.UNKNOWN);
             return;
         }
-        final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
+        final MessageContext ctx = new MessageContext(this.nodeState.getChain().newWriteOnlyTransaction());
 
         if (onMessage(ctx, message)) {
             LOG.warn("Unhandled message {} on session {}", message, session);
index 71741df9f21ae5ff305795c207a6f60ab90143e3..73acea1510ccffada496ee12d6c4adece55161b8 100755 (executable)
@@ -106,7 +106,7 @@ public final class PCEPTopologyProvider extends DefaultTopologyReference {
                 new TopologyProgramming(this.scheduler, this.manager)));
         this.network.registerPath(NetworkTopologyContext.class, this.topology);
         try {
-            this.manager.instantiateServiceInstance().checkedGet();
+            this.manager.instantiateServiceInstance();
             final ChannelFuture channelFuture = this.dependenciesProvider.getPCEPDispatcher()
                 .createServer(this.address, this.keys, this.manager, this.manager);
             channelFuture.get();
index 8eb1fa454fd9557b180c79a37d44e8f1682e606f..13110181f09d2226701d3273008abd9afa5cfa6d 100755 (executable)
@@ -18,6 +18,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -85,11 +86,9 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
     }
 
     /**
-     * Create Base Topology
-     *
-     * @throws TransactionCommitFailedException exception
+     * Create Base Topology.
      */
-    synchronized CheckedFuture<Void, TransactionCommitFailedException> instantiateServiceInstance() {
+    synchronized void instantiateServiceInstance() {
         final TopologyKey key = InstanceIdentifier.keyOf(this.topology);
         final TopologyId topologyId = key.getTopologyId();
         final WriteTransaction tx = this.broker.newWriteOnlyTransaction();
@@ -98,21 +97,14 @@ final class ServerSessionManager implements PCEPSessionListenerFactory, Topology
                 .addAugmentation(TopologyTypes1.class, new TopologyTypes1Builder().setTopologyPcep(
                     new TopologyPcepBuilder().build()).build()).build())
             .setNode(new ArrayList<>()).build(), true);
-        final CheckedFuture<Void, TransactionCommitFailedException> future = tx.submit();
-        Futures.addCallback(future, new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                LOG.debug("PCEP Topology {} created successfully.", topologyId.getValue());
-                ServerSessionManager.this.isClosed.set(false);
-            }
-
-            @Override
-            public void onFailure(final Throwable t) {
-                LOG.error("Failed to create PCEP Topology {}.", topologyId.getValue(), t);
-                ServerSessionManager.this.isClosed.set(true);
-            }
-        });
-        return future;
+        try {
+            tx.submit().get();
+            LOG.info("PCEP Topology {} created successfully.", topologyId.getValue());
+            ServerSessionManager.this.isClosed.set(false);
+        } catch (final ExecutionException | InterruptedException throwable) {
+            LOG.error("Failed to create PCEP Topology {}.", topologyId.getValue(), throwable);
+            ServerSessionManager.this.isClosed.set(true);
+        }
     }
 
     private static NodeId createNodeId(final InetAddress addr) {
index 4cbe6830c140c0e439006b9024e865c9b3fec26f..62a53167c8029fd3cdb152a32f8fdfee91fc6b3e 100644 (file)
@@ -409,7 +409,7 @@ class Stateful07TopologySessionListener extends AbstractTopologySessionListener<
     }
 
     @Override
-    protected boolean onMessage(final MessageContext ctx, final Message message) {
+    protected synchronized boolean onMessage(final MessageContext ctx, final Message message) {
         if (message instanceof PcerrMessage) {
             return handleErrorMessage((PcerrMessage) message);
         }
index b9403da534b15501996713a113cb35d59257def4..18ba3ea1c5c42b822a148ec0fa2776814d443be1 100644 (file)
@@ -12,20 +12,25 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.protocol.pcep.PCEPSession;
+import org.opendaylight.protocol.pcep.TerminationReason;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.lsp.metadata.Metadata;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.topology.pcep.rev131024.Node1;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
@@ -46,6 +51,7 @@ final class TopologyNodeState implements AutoCloseable, TransactionChainListener
     private final long holdStateNanos;
     private long lastReleased = 0;
     //cache initial node state, if any node was persisted
+    @GuardedBy("this")
     private Node initialNodeState = null;
 
     public TopologyNodeState(final DataBroker broker, final InstanceIdentifier<Topology> topology, final NodeId id, final long holdStateNanos) {
@@ -79,7 +85,7 @@ final class TopologyNodeState implements AutoCloseable, TransactionChainListener
         // The session went down. Undo all the Topology changes we have done.
         // We might want to persist topology node for later re-use.
         if (!persist) {
-            final WriteTransaction trans = beginTransaction();
+            final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
             trans.delete(LogicalDatastoreType.OPERATIONAL, this.nodeId);
             Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
                 @Override
@@ -106,23 +112,17 @@ final class TopologyNodeState implements AutoCloseable, TransactionChainListener
 
         //try to get the topology's node
         if (retrieveNode) {
-            Futures.addCallback(readOperationalData(this.nodeId), new FutureCallback<Optional<Node>>() {
-
-                @Override
-                public void onSuccess(final Optional<Node> result) {
-                    if (!result.isPresent()) {
-                        putTopologyNode();
-                    } else {
-                        //cache retrieved node
-                        TopologyNodeState.this.initialNodeState = result.get();
-                    }
-                }
-
-                @Override
-                public void onFailure(final Throwable t) {
-                    LOG.error("Failed to get topology node {}", TopologyNodeState.this.nodeId, t);
+            try {
+                final Optional<Node> prevNode = readOperationalData(this.nodeId).get();
+                if (!prevNode.isPresent()) {
+                    putTopologyNode();
+                } else {
+                    //cache retrieved node
+                    TopologyNodeState.this.initialNodeState = prevNode.get();
                 }
-            });
+            } catch (final ExecutionException | InterruptedException throwable) {
+                LOG.error("Failed to get topology node {}", TopologyNodeState.this.nodeId, throwable);
+            }
         } else {
             putTopologyNode();
         }
@@ -132,15 +132,11 @@ final class TopologyNodeState implements AutoCloseable, TransactionChainListener
         return this.initialNodeState;
     }
 
-    WriteTransaction beginTransaction() {
-        return this.chain.newWriteOnlyTransaction();
+    synchronized BindingTransactionChain getChain() {
+        return this.chain;
     }
 
-    ReadWriteTransaction rwTransaction() {
-        return this.chain.newReadWriteTransaction();
-    }
-
-    <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(final InstanceIdentifier<T> id) {
+    synchronized <T extends DataObject> ListenableFuture<Optional<T>> readOperationalData(final InstanceIdentifier<T> id) {
         try (final ReadOnlyTransaction t = this.chain.newReadOnlyTransaction()) {
             return t.read(LogicalDatastoreType.OPERATIONAL, id);
         }
@@ -158,15 +154,48 @@ final class TopologyNodeState implements AutoCloseable, TransactionChainListener
     }
 
     @Override
-    public void close() {
+    public synchronized void close() {
         this.chain.close();
     }
 
-    private void putTopologyNode() {
-        final Node node = new NodeBuilder().setKey(this.nodeId.getKey()).setNodeId(this.nodeId.getKey().getNodeId()).build();
-        final WriteTransaction t = beginTransaction();
-        t.put(LogicalDatastoreType.OPERATIONAL, this.nodeId, node);
-        t.submit();
+    private synchronized void putTopologyNode() {
+        final Node node = new NodeBuilder().setKey(this.nodeId.getKey())
+                .setNodeId(this.nodeId.getKey().getNodeId()).build();
+        final WriteTransaction t = this.chain.newWriteOnlyTransaction();
+        LOG.trace("Put topology Node {}, value {}", this.nodeId, node);
+        t.merge(LogicalDatastoreType.OPERATIONAL, this.nodeId, node);
+        Futures.addCallback(t.submit(), new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.trace("Topology Node stored {}, value {}", TopologyNodeState.this.nodeId, node);
+            }
+
+            @Override
+            public void onFailure(final Throwable throwable) {
+                LOG.trace("Put topology Node failed {}, value {}, {}", TopologyNodeState.this.nodeId, node, throwable);
+            }
+        }, MoreExecutors.directExecutor());
     }
 
+    public synchronized void storeNode(final InstanceIdentifier<Node1> topologyAugment, final Node1 ta,
+            final PCEPSession session) {
+        LOG.trace("Peer data {} set to {}", topologyAugment, ta);
+        final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
+        trans.put(LogicalDatastoreType.OPERATIONAL, topologyAugment, ta);
+
+        // All set, commit the modifications
+        Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                LOG.trace("Node stored {} for session {} updated successfully", topologyAugment, session);
+            }
+
+            @Override
+            public void onFailure(final Throwable throwable) {
+                LOG.error("Failed to store node {} for session {}, terminating it",
+                        topologyAugment, session, throwable);
+                session.close(TerminationReason.UNKNOWN);
+            }
+        }, MoreExecutors.directExecutor());
+    }
 }
\ No newline at end of file
index b9caa8aca2f545799107cd8166e2184141b75a93..3d3d18bb9602ee72af4625289062f5048bde20de 100644 (file)
@@ -130,16 +130,14 @@ public final class PCEPTopologyProviderBean implements PCEPTopologyProviderDepen
         @Override
         public synchronized void instantiateServiceInstance() {
             LOG.info("PCEP Topology Provider Singleton Service {} instantiated", getIdentifier().getValue());
-            if (this.pcepTopoProvider != null) {
-                this.pcepTopoProvider.instantiateServiceInstance();
-                this.serviceInstantiated = true;
-            }
+            this.pcepTopoProvider.instantiateServiceInstance();
+            this.serviceInstantiated = true;
         }
 
         @Override
         public synchronized ListenableFuture<Void> closeServiceInstance() {
             LOG.info("Close PCEP Topology Provider Singleton Service {}", getIdentifier().getValue());
-            if (this.pcepTopoProvider != null && this.serviceInstantiated) {
+            if (this.serviceInstantiated) {
                 this.serviceInstantiated = false;
                 return this.pcepTopoProvider.closeServiceInstance();
             }
index a201d6517cf67fbeec4a43e4bb5f2c21421c1d7d..0cd6736b6d00a85c410ab4a8e8c0da0402990246 100644 (file)
@@ -9,17 +9,12 @@
 package org.opendaylight.bgpcep.pcep.topology.provider;
 
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
@@ -33,7 +28,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
 import org.junit.After;
 import org.junit.Before;
 import org.mockito.Mock;
@@ -45,7 +39,6 @@ import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopolo
 import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopologyProviderRuntimeRegistration;
 import org.opendaylight.controller.config.yang.pcep.topology.provider.PCEPTopologyProviderRuntimeRegistrator;
 import org.opendaylight.controller.md.sal.binding.test.AbstractConcurrentDataBrokerTest;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
 import org.opendaylight.protocol.pcep.impl.DefaultPCEPSessionNegotiator;
 import org.opendaylight.protocol.pcep.impl.PCEPSessionImpl;
@@ -166,23 +159,12 @@ public abstract class AbstractPCEPSessionTest<T extends TopologySessionListenerF
         this.topologyRpcs = new TopologyRPCs(this.manager);
     }
 
-    protected void startSessionManager() throws TransactionCommitFailedException, InterruptedException {
+    protected void startSessionManager() throws InterruptedException {
         this.manager.setRuntimeRootRegistrator(this.registrator);
-        final CheckedFuture<Void, TransactionCommitFailedException> future = this.manager.instantiateServiceInstance();
+        this.manager.instantiateServiceInstance();
         final CountDownLatch lock = new CountDownLatch(1);
-        Futures.addCallback(future, new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(@Nullable final Void aVoid) {
-                lock.countDown();
-            }
-
-            @Override
-            public void onFailure(final Throwable throwable) {
-                // the test cannot continue
-                fail();
-            }
-        }, MoreExecutors.directExecutor());
-        future.checkedGet();
+        this.manager.instantiateServiceInstance();
+        lock.countDown();
         lock.await(5000, TimeUnit.MILLISECONDS);
         assertFalse(this.manager.isClosed.get());
     }