BGPCEP-739: Fix "raced with transaction PingPongTransaction"
[bgpcep.git] / pcep / topology-provider / src / main / java / org / opendaylight / bgpcep / pcep / topology / provider / AbstractTopologySessionListener.java
index e38b7ed6b708f1c6a2677d19ccd9e8d6b2d41f22..ea8c2f1be42bb32ba42f4c61d28020bb1fc9ae64 100755 (executable)
@@ -186,7 +186,8 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
             loadLspData(initialNodeState, this.lspData, this.lsps, isIncrementalSynchro());
             pccBuilder.setReportedLsp(initialNodeState.getAugmentation(Node1.class).getPathComputationClient().getReportedLsp());
         }
-        writeNode(pccBuilder, state, topologyAugment);
+        state.storeNode(topologyAugment,
+                new Node1Builder().setPathComputationClient(pccBuilder.build()).build(), this.session);
         register();
         if (this.registration == null) {
             LOG.error("PCEP session fails to register. Closing session {}", session);
@@ -198,36 +199,13 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
         LOG.info("Session with {} attached to topology node {}", session.getRemoteAddress(), state.getNodeId());
     }
 
-    private void writeNode(final PathComputationClientBuilder pccBuilder, final TopologyNodeState state,
-            final InstanceIdentifier<Node1> topologyAugment) {
-        final Node1 ta = new Node1Builder().setPathComputationClient(pccBuilder.build()).build();
-
-        final ReadWriteTransaction trans = state.rwTransaction();
-        trans.put(LogicalDatastoreType.OPERATIONAL, topologyAugment, ta);
-        LOG.trace("Peer data {} set to {}", topologyAugment, ta);
-
-        // All set, commit the modifications
-        Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                LOG.trace("Internal state for session {} updated successfully", AbstractTopologySessionListener.this.session);
-            }
-
-            @Override
-            public void onFailure(final Throwable t) {
-                LOG.error("Failed to update internal state for session {}, terminating it", AbstractTopologySessionListener.this.session, t);
-                AbstractTopologySessionListener.this.session.close(TerminationReason.UNKNOWN);
-            }
-        });
-    }
-
     protected void updatePccState(final PccSyncState pccSyncState) {
         if (this.nodeState == null) {
             LOG.info("Server Session Manager is closed.");
             AbstractTopologySessionListener.this.session.close(TerminationReason.UNKNOWN);
             return;
         }
-        final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
+        final MessageContext ctx = new MessageContext(this.nodeState.getChain().newWriteOnlyTransaction());
         updatePccNode(ctx, new PathComputationClientBuilder().setStateSync(pccSyncState).build());
         if (pccSyncState != PccSyncState.Synchronized) {
             this.synced = false;
@@ -237,12 +215,12 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
         Futures.addCallback(ctx.trans.submit(), new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
-                LOG.trace("Internal state for session {} updated successfully", AbstractTopologySessionListener.this.session);
+                LOG.trace("Pcc Internal state for session {} updated successfully", AbstractTopologySessionListener.this.session);
             }
 
             @Override
             public void onFailure(final Throwable t) {
-                LOG.error("Failed to update internal state for session {}", AbstractTopologySessionListener.this.session, t);
+                LOG.error("Failed to update Pcc internal state for session {}", AbstractTopologySessionListener.this.session, t);
                 AbstractTopologySessionListener.this.session.close(TerminationReason.UNKNOWN);
             }
         });
@@ -318,7 +296,7 @@ public abstract class AbstractTopologySessionListener<S, L> implements TopologyS
             session.close(TerminationReason.UNKNOWN);
             return;
         }
-        final MessageContext ctx = new MessageContext(this.nodeState.beginTransaction());
+        final MessageContext ctx = new MessageContext(this.nodeState.getChain().newWriteOnlyTransaction());
 
         if (onMessage(ctx, message)) {
             LOG.warn("Unhandled message {} on session {}", message, session);