BUG-8056: make doCommit/finishCommit package-private
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / FrontendTransaction.java
index 9240aab26c22cb8402941d01a829ef68a6855d56..8846467b59d2d924f783014215d947551a498f21 100644 (file)
@@ -7,58 +7,30 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.base.Optional;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
-import com.google.common.primitives.UnsignedLong;
-import com.google.common.util.concurrent.FutureCallback;
 import java.util.ArrayDeque;
 import java.util.Iterator;
 import java.util.Queue;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
-import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
-import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess;
 import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
-import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
-import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
-import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
-import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
-import org.opendaylight.controller.cluster.access.commands.TransactionModification;
-import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
-import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
-import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.opendaylight.yangtools.concepts.Identifiable;
 
 /**
- * Frontend transaction state as observed by the shard leader.
+ * Frontend common transaction state as observed by the shard leader.
  *
  * @author Robert Varga
  */
 @NotThreadSafe
-final class FrontendTransaction {
-    private static final Logger LOG = LoggerFactory.getLogger(FrontendTransaction.class);
-
+abstract class FrontendTransaction implements Identifiable<TransactionIdentifier> {
     private final AbstractFrontendHistory history;
     private final TransactionIdentifier id;
 
@@ -72,35 +44,21 @@ final class FrontendTransaction {
     private Long lastPurgedSequence;
     private long expectedSequence;
 
-    private ReadWriteShardDataTreeTransaction openTransaction;
-    private DataTreeModification sealedModification;
-    private ShardDataTreeCohort readyCohort;
-
-    private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
-            final ReadWriteShardDataTreeTransaction transaction) {
+    FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id) {
         this.history = Preconditions.checkNotNull(history);
         this.id = Preconditions.checkNotNull(id);
-        this.openTransaction = Preconditions.checkNotNull(transaction);
     }
 
-    private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
-            final DataTreeModification mod) {
-        this.history = Preconditions.checkNotNull(history);
-        this.id = Preconditions.checkNotNull(id);
-        this.sealedModification = Preconditions.checkNotNull(mod);
+    @Override
+    public final TransactionIdentifier getIdentifier() {
+        return id;
     }
 
-    static FrontendTransaction createOpen(final AbstractFrontendHistory history,
-            final ReadWriteShardDataTreeTransaction transaction) {
-        return new FrontendTransaction(history, transaction.getIdentifier(), transaction);
+    final AbstractFrontendHistory history() {
+        return history;
     }
 
-    static FrontendTransaction createReady(final AbstractFrontendHistory history, final TransactionIdentifier id,
-            final DataTreeModification mod) {
-        return new FrontendTransaction(history, id, mod);
-    }
-
-    java.util.Optional<TransactionSuccess<?>> replaySequence(final long sequence) throws RequestException {
+    final java.util.Optional<TransactionSuccess<?>> replaySequence(final long sequence) throws RequestException {
         // Fast path check: if the requested sequence is the next request, bail early
         if (expectedSequence == sequence) {
             return java.util.Optional.empty();
@@ -144,36 +102,15 @@ final class FrontendTransaction {
         return java.util.Optional.empty();
     }
 
-    void purgeSequencesUpTo(final long sequence) {
+    final void purgeSequencesUpTo(final long sequence) {
         // FIXME: implement this
 
         lastPurgedSequence = sequence;
     }
 
     // Sequence has already been checked
-    @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
-            final long now) throws RequestException {
-        if (request instanceof ModifyTransactionRequest) {
-            return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
-        } else if (request instanceof CommitLocalTransactionRequest) {
-            handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now);
-            return null;
-        } else if (request instanceof ExistsTransactionRequest) {
-            return handleExistsTransaction((ExistsTransactionRequest) request);
-        } else if (request instanceof ReadTransactionRequest) {
-            return handleReadTransaction((ReadTransactionRequest) request);
-        } else if (request instanceof TransactionPreCommitRequest) {
-            handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now);
-            return null;
-        } else if (request instanceof TransactionDoCommitRequest) {
-            handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
-            return null;
-        } else if (request instanceof TransactionAbortRequest) {
-            return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
-        } else {
-            throw new UnsupportedRequestException(request);
-        }
-    }
+    abstract @Nullable TransactionSuccess<?> handleRequest(TransactionRequest<?> request,
+            RequestEnvelope envelope, long now) throws RequestException;
 
     private void recordResponse(final long sequence, final Object response) {
         if (replayQueue.isEmpty()) {
@@ -183,7 +120,7 @@ final class FrontendTransaction {
         expectedSequence++;
     }
 
-    private <T extends TransactionSuccess<?>> T recordSuccess(final long sequence, final T success) {
+    final <T extends TransactionSuccess<?>> T recordSuccess(final long sequence, final T success) {
         recordResponse(sequence, success);
         return success;
     }
@@ -192,215 +129,24 @@ final class FrontendTransaction {
         return history.readTime() - startTime;
     }
 
-    private void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime,
+    final void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime,
             final TransactionSuccess<?> success) {
         recordResponse(success.getSequence(), success);
         envelope.sendSuccess(success, executionTime(startTime));
     }
 
-    private void recordAndSendFailure(final RequestEnvelope envelope, final long startTime,
+    final void recordAndSendFailure(final RequestEnvelope envelope, final long startTime,
             final RuntimeRequestException failure) {
         recordResponse(envelope.getMessage().getSequence(), failure);
         envelope.sendFailure(failure, executionTime(startTime));
     }
 
-    private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-        readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
-            @Override
-            public void onSuccess(final DataTreeCandidate result) {
-                recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
-                    request.getSequence()));
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure));
-                readyCohort = null;
-            }
-        });
-    }
-
-    private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
-            final long now) throws RequestException {
-        readyCohort.commit(new FutureCallback<UnsignedLong>() {
-            @Override
-            public void onSuccess(final UnsignedLong result) {
-                successfulCommit(envelope, now);
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure));
-                readyCohort = null;
-            }
-        });
-    }
-
-    private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-        if (readyCohort == null) {
-            openTransaction.abort();
-            return new TransactionAbortSuccess(id, request.getSequence());
-        }
-
-        readyCohort.abort(new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                readyCohort = null;
-                recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(id, request.getSequence()));
-                LOG.debug("Transaction {} aborted", id);
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                readyCohort = null;
-                LOG.warn("Transaction {} abort failed", id, failure);
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
-            }
-        });
-        return null;
-    }
-
-    private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
-        readyCohort.canCommit(new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
-                    envelope.getMessage().getSequence()));
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
-                readyCohort = null;
-            }
-        });
-    }
-
-    private void directCommit(final RequestEnvelope envelope, final long now) {
-        readyCohort.canCommit(new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                successfulDirectCanCommit(envelope, now);
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
-                readyCohort = null;
-            }
-        });
-
-    }
-
-    private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
-        readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
-            @Override
-            public void onSuccess(final DataTreeCandidate result) {
-                successfulDirectPreCommit(envelope, startTime);
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
-                readyCohort = null;
-            }
-        });
+    @Override
+    public final String toString() {
+        return MoreObjects.toStringHelper(this).omitNullValues().add("identifier", getIdentifier())
+                .add("expectedSequence", expectedSequence).add("firstReplaySequence", firstReplaySequence)
+                .add("lastPurgedSequence", lastPurgedSequence)
+                .toString();
     }
 
-    private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
-        readyCohort.commit(new FutureCallback<UnsignedLong>() {
-
-            @Override
-            public void onSuccess(final UnsignedLong result) {
-                successfulCommit(envelope, startTime);
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
-                readyCohort = null;
-            }
-        });
-    }
-
-    private void successfulCommit(final RequestEnvelope envelope, final long startTime) {
-        recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
-            envelope.getMessage().getSequence()));
-        readyCohort = null;
-    }
-
-    private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-        if (sealedModification.equals(request.getModification())) {
-            readyCohort = history.createReadyCohort(id, sealedModification);
-
-            if (request.isCoordinated()) {
-                coordinatedCommit(envelope, now);
-            } else {
-                directCommit(envelope, now);
-            }
-        } else {
-            throw new UnsupportedRequestException(request);
-        }
-    }
-
-    private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
-            throws RequestException {
-        final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
-        return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(id, request.getSequence(),
-            data.isPresent()));
-    }
-
-    private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
-            throws RequestException {
-        final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
-        return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data));
-    }
-
-    private ModifyTransactionSuccess replyModifySuccess(final long sequence) {
-        return recordSuccess(sequence, new ModifyTransactionSuccess(id, sequence));
-    }
-
-    private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-
-        final DataTreeModification modification = openTransaction.getSnapshot();
-        for (TransactionModification m : request.getModifications()) {
-            if (m instanceof TransactionDelete) {
-                modification.delete(m.getPath());
-            } else if (m instanceof TransactionWrite) {
-                modification.write(m.getPath(), ((TransactionWrite) m).getData());
-            } else if (m instanceof TransactionMerge) {
-                modification.merge(m.getPath(), ((TransactionMerge) m).getData());
-            } else {
-                LOG.warn("{}: ignoring unhandled modification {}", history.persistenceId(), m);
-            }
-        }
-
-        final java.util.Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
-        if (!maybeProto.isPresent()) {
-            return replyModifySuccess(request.getSequence());
-        }
-
-        switch (maybeProto.get()) {
-            case ABORT:
-                openTransaction.abort();
-                openTransaction = null;
-                return replyModifySuccess(request.getSequence());
-            case SIMPLE:
-                readyCohort = openTransaction.ready();
-                openTransaction = null;
-                directCommit(envelope, now);
-                return null;
-            case THREE_PHASE:
-                readyCohort = openTransaction.ready();
-                openTransaction = null;
-                coordinatedCommit(envelope, now);
-                return null;
-            default:
-                throw new UnsupportedRequestException(request);
-        }
-    }
 }