Cleanup java.util.Optional references
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / FrontendTransaction.java
index e4dd00b602f23fccf578f698c349e3ab9888c103..25cb174ea6a4085f4d767cf9ce62a85b61ae30ef 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2016, 2017 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -7,13 +7,16 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import java.util.ArrayDeque;
-import java.util.Iterator;
+import java.util.Optional;
 import java.util.Queue;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceSuccess;
 import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
@@ -22,6 +25,8 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Frontend common transaction state as observed by the shard leader.
@@ -30,6 +35,8 @@ import org.opendaylight.yangtools.concepts.Identifiable;
  */
 @NotThreadSafe
 abstract class FrontendTransaction implements Identifiable<TransactionIdentifier> {
+    private static final Logger LOG = LoggerFactory.getLogger(FrontendTransaction.class);
+
     private final AbstractFrontendHistory history;
     private final TransactionIdentifier id;
 
@@ -43,6 +50,8 @@ abstract class FrontendTransaction implements Identifiable<TransactionIdentifier
     private Long lastPurgedSequence;
     private long expectedSequence;
 
+    private RequestException previousFailure;
+
     FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id) {
         this.history = Preconditions.checkNotNull(history);
         this.id = Preconditions.checkNotNull(id);
@@ -57,10 +66,14 @@ abstract class FrontendTransaction implements Identifiable<TransactionIdentifier
         return history;
     }
 
-    final java.util.Optional<TransactionSuccess<?>> replaySequence(final long sequence) throws RequestException {
+    final String persistenceId() {
+        return history().persistenceId();
+    }
+
+    final Optional<TransactionSuccess<?>> replaySequence(final long sequence) throws RequestException {
         // Fast path check: if the requested sequence is the next request, bail early
         if (expectedSequence == sequence) {
-            return java.util.Optional.empty();
+            return Optional.empty();
         }
 
         // Check sequencing: we do not need to bother with future requests
@@ -82,23 +95,21 @@ abstract class FrontendTransaction implements Identifiable<TransactionIdentifier
         // machinery is asynchronous, hence a reply may be in the works and not available.
 
         long replaySequence = firstReplaySequence;
-        final Iterator<?> it = replayQueue.iterator();
-        while (it.hasNext()) {
-            final Object replay = it.next();
+        for (Object replay : replayQueue) {
             if (replaySequence == sequence) {
                 if (replay instanceof RequestException) {
                     throw (RequestException) replay;
                 }
 
                 Verify.verify(replay instanceof TransactionSuccess);
-                return java.util.Optional.of((TransactionSuccess<?>) replay);
+                return Optional.of((TransactionSuccess<?>) replay);
             }
 
             replaySequence++;
         }
 
         // Not found
-        return java.util.Optional.empty();
+        return Optional.empty();
     }
 
     final void purgeSequencesUpTo(final long sequence) {
@@ -107,9 +118,44 @@ abstract class FrontendTransaction implements Identifiable<TransactionIdentifier
         lastPurgedSequence = sequence;
     }
 
-    // Sequence has already been checked
-    abstract @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request,
-            final RequestEnvelope envelope, final long now) throws RequestException;
+    // Request order has already been checked by caller and replaySequence()
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    @Nullable
+    final TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
+            final long now) throws RequestException {
+        if (request instanceof IncrementTransactionSequenceRequest) {
+            final IncrementTransactionSequenceRequest incr = (IncrementTransactionSequenceRequest) request;
+            expectedSequence += incr.getIncrement();
+
+            return recordSuccess(incr.getSequence(),
+                    new IncrementTransactionSequenceSuccess(incr.getTarget(), incr.getSequence()));
+        }
+
+        if (previousFailure != null) {
+            LOG.debug("{}: Rejecting request {} due to previous failure", persistenceId(), request, previousFailure);
+            throw previousFailure;
+        }
+
+        try {
+            return doHandleRequest(request, envelope, now);
+        } catch (RuntimeException e) {
+            /*
+             * The request failed to process, we should not attempt to ever
+             * apply it again. Furthermore we cannot accept any further requests
+             * from this connection, simply because the transaction state is
+             * undefined.
+             */
+            LOG.debug("{}: Request {} failed to process", persistenceId(), request, e);
+            previousFailure = new RuntimeRequestException("Request " + request + " failed to process", e);
+            throw previousFailure;
+        }
+    }
+
+    @Nullable
+    abstract TransactionSuccess<?> doHandleRequest(TransactionRequest<?> request, RequestEnvelope envelope,
+            long now) throws RequestException;
+
+    abstract void retire();
 
     private void recordResponse(final long sequence, final Object response) {
         if (replayQueue.isEmpty()) {
@@ -139,4 +185,12 @@ abstract class FrontendTransaction implements Identifiable<TransactionIdentifier
         recordResponse(envelope.getMessage().getSequence(), failure);
         envelope.sendFailure(failure, executionTime(startTime));
     }
+
+    @Override
+    public final String toString() {
+        return MoreObjects.toStringHelper(this).omitNullValues().add("identifier", getIdentifier())
+                .add("expectedSequence", expectedSequence).add("firstReplaySequence", firstReplaySequence)
+                .add("lastPurgedSequence", lastPurgedSequence)
+                .toString();
+    }
 }