/*
* Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.base.Verify.verifyNotNull;
import static java.util.Objects.requireNonNull;
import akka.actor.ActorRef;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class translating transaction operations towards a particular backend shard.
*
*
* This class is not safe to access from multiple application threads, as is usual for transactions. Internal state
* transitions coming from interactions with backend are expected to be thread-safe.
*
*
* This class interacts with the queueing mechanism in ClientActorBehavior, hence once we arrive at a decision
* to use either a local or remote implementation, we are stuck with it. We can re-evaluate on the next transaction.
*/
abstract sealed class AbstractProxyTransaction implements Identifiable
permits LocalProxyTransaction, RemoteProxyTransaction {
/**
* Marker object used instead of read-type of requests, which are satisfied only once. This has a lower footprint
* and allows compressing multiple requests into a single entry. This class is not thread-safe.
*/
private static final class IncrementSequence {
private final long sequence;
private long delta = 0;
IncrementSequence(final long sequence) {
this.sequence = sequence;
}
long getDelta() {
return delta;
}
long getSequence() {
return sequence;
}
void incrementDelta() {
delta++;
}
}
/**
* Base class for representing logical state of this proxy. See individual instantiations and {@link SuccessorState}
* for details.
*/
private static class State {
private final String string;
State(final String string) {
this.string = requireNonNull(string);
}
@Override
public final String toString() {
return string;
}
}
/**
* State class used when a successor has interfered. Contains coordinator latch, the successor and previous state.
* This is a temporary state introduced during reconnection process and is necessary for correct state hand-off
* between the old connection (potentially being accessed by the user) and the new connection (being cleaned up
* by the actor.
*
*
* When a user operation encounters this state, it synchronizes on the it and wait until reconnection completes,
* at which point the request is routed to the successor transaction. This is a relatively heavy-weight solution
* to the problem of state transfer, but the user will observe it only if the race condition is hit.
*/
private static class SuccessorState extends State {
private final CountDownLatch latch = new CountDownLatch(1);
private AbstractProxyTransaction successor;
private State prevState;
// SUCCESSOR + DONE
private boolean done;
SuccessorState() {
super("SUCCESSOR");
}
// Synchronize with succession process and return the successor
AbstractProxyTransaction await() {
try {
latch.await();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for latch of {}", successor);
throw new IllegalStateException(e);
}
return successor;
}
void finish() {
latch.countDown();
}
State getPrevState() {
return verifyNotNull(prevState, "Attempted to access previous state, which was not set");
}
void setPrevState(final State prevState) {
verify(this.prevState == null, "Attempted to set previous state to %s when we already have %s", prevState,
this.prevState);
this.prevState = requireNonNull(prevState);
// We cannot have duplicate successor states, so this check is sufficient
done = DONE.equals(prevState);
}
// To be called from safe contexts, where successor is known to be completed
AbstractProxyTransaction getSuccessor() {
return verifyNotNull(successor);
}
void setSuccessor(final AbstractProxyTransaction successor) {
verify(this.successor == null, "Attempted to set successor to %s when we already have %s", successor,
this.successor);
this.successor = requireNonNull(successor);
}
boolean isDone() {
return done;
}
void setDone() {
done = true;
}
}
private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
private static final AtomicIntegerFieldUpdater SEALED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractProxyTransaction.class, "sealed");
private static final AtomicReferenceFieldUpdater STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractProxyTransaction.class, State.class, "state");
/**
* Transaction has been open and is being actively worked on.
*/
private static final State OPEN = new State("OPEN");
/**
* Transaction has been sealed by the user, but it has not completed flushing to the backed, yet. This is
* a transition state, as we are waiting for the user to initiate commit procedures.
*
*
* Since the reconnect mechanics relies on state replay for transactions, this state needs to be flushed into the
* queue to re-create state in successor transaction (which may be based on different messages as locality may have
* changed). Hence the transition to {@link #FLUSHED} state needs to be handled in a thread-safe manner.
*/
private static final State SEALED = new State("SEALED");
/**
* Transaction state has been flushed into the queue, i.e. it is visible by the successor and potentially
* the backend. At this point the transaction does not hold any state besides successful requests, all other state
* is held either in the connection's queue or the successor object.
*
*
* Transition to this state indicates we have all input from the user we need to initiate the correct commit
* protocol.
*/
private static final State FLUSHED = new State("FLUSHED");
/**
* Transaction state has been completely resolved, we have received confirmation of the transaction fate from
* the backend. The only remaining task left to do is finishing up the state cleanup, which is done via purge
* request. We need to hang on to the transaction until that is done, as we have to make sure backend completes
* purging its state -- otherwise we could have a leak on the backend.
*/
private static final State DONE = new State("DONE");
// Touched from client actor thread only
private final Deque