X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FFrontendTransaction.java;h=b5cd5879f74d45ddad6534213d5ae880813207a3;hp=16c180e4ed9bde26a905654bc730a1e3064de825;hb=127042ea7e148d9dc0282acc3780b4754ca69e12;hpb=b66d5a3c59525a1c7885c3d653d9657a99f4103d;ds=sidebyside diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java index 16c180e4ed..b5cd5879f7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2016, 2017 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -7,13 +7,16 @@ */ package org.opendaylight.controller.cluster.datastore; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import java.util.ArrayDeque; -import java.util.Iterator; +import java.util.Optional; import java.util.Queue; -import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest; +import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceSuccess; import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; @@ -22,6 +25,8 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Frontend common transaction state as observed by the shard leader. @@ -30,6 +35,8 @@ import org.opendaylight.yangtools.concepts.Identifiable; */ @NotThreadSafe abstract class FrontendTransaction implements Identifiable { + private static final Logger LOG = LoggerFactory.getLogger(FrontendTransaction.class); + private final AbstractFrontendHistory history; private final TransactionIdentifier id; @@ -43,6 +50,8 @@ abstract class FrontendTransaction implements Identifiable> replaySequence(final long sequence) throws RequestException { + final String persistenceId() { + return history().persistenceId(); + } + + final Optional> replaySequence(final long sequence) throws RequestException { // Fast path check: if the requested sequence is the next request, bail early if (expectedSequence == sequence) { - return java.util.Optional.empty(); + return Optional.empty(); } // Check sequencing: we do not need to bother with future requests @@ -82,23 +95,21 @@ abstract class FrontendTransaction implements Identifiable it = replayQueue.iterator(); - while (it.hasNext()) { - final Object replay = it.next(); + for (Object replay : replayQueue) { if (replaySequence == sequence) { if (replay instanceof RequestException) { throw (RequestException) replay; } Verify.verify(replay instanceof TransactionSuccess); - return java.util.Optional.of((TransactionSuccess) replay); + return Optional.of((TransactionSuccess) replay); } replaySequence++; } // Not found - return java.util.Optional.empty(); + return Optional.empty(); } final void purgeSequencesUpTo(final long sequence) { @@ -107,9 +118,42 @@ abstract class FrontendTransaction implements Identifiable handleRequest(TransactionRequest request, - RequestEnvelope envelope, long now) throws RequestException; + // Request order has already been checked by caller and replaySequence() + @SuppressWarnings("checkstyle:IllegalCatch") + final @Nullable TransactionSuccess handleRequest(final TransactionRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + if (request instanceof IncrementTransactionSequenceRequest) { + final IncrementTransactionSequenceRequest incr = (IncrementTransactionSequenceRequest) request; + expectedSequence += incr.getIncrement(); + + return recordSuccess(incr.getSequence(), + new IncrementTransactionSequenceSuccess(incr.getTarget(), incr.getSequence())); + } + + if (previousFailure != null) { + LOG.debug("{}: Rejecting request {} due to previous failure", persistenceId(), request, previousFailure); + throw previousFailure; + } + + try { + return doHandleRequest(request, envelope, now); + } catch (RuntimeException e) { + /* + * The request failed to process, we should not attempt to ever + * apply it again. Furthermore we cannot accept any further requests + * from this connection, simply because the transaction state is + * undefined. + */ + LOG.debug("{}: Request {} failed to process", persistenceId(), request, e); + previousFailure = new RuntimeRequestException("Request " + request + " failed to process", e); + throw previousFailure; + } + } + + abstract @Nullable TransactionSuccess doHandleRequest(TransactionRequest request, RequestEnvelope envelope, + long now) throws RequestException; + + abstract void retire(); private void recordResponse(final long sequence, final Object response) { if (replayQueue.isEmpty()) { @@ -139,4 +183,12 @@ abstract class FrontendTransaction implements Identifiable