/*
* Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
import akka.actor.ActorRef;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.base.Verify;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class translating transaction operations towards a particular backend shard.
*
*
* This class is not safe to access from multiple application threads, as is usual for transactions. Internal state
* transitions coming from interactions with backend are expected to be thread-safe.
*
*
* This class interacts with the queueing mechanism in ClientActorBehavior, hence once we arrive at a decision
* to use either a local or remote implementation, we are stuck with it. We can re-evaluate on the next transaction.
*
* @author Robert Varga
*/
abstract class AbstractProxyTransaction implements Identifiable {
/**
* Marker object used instead of read-type of requests, which are satisfied only once. This has a lower footprint
* and allows compressing multiple requests into a single entry.
*/
@NotThreadSafe
private static final class IncrementSequence {
private long delta = 1;
long getDelta() {
return delta;
}
void incrementDelta() {
delta++;
}
}
// Generic state base class. Direct instances are used for fast paths, sub-class is used for successor transitions
private static class State {
private final String string;
State(final String string) {
this.string = Preconditions.checkNotNull(string);
}
@Override
public final String toString() {
return string;
}
}
// State class used when a successor has interfered. Contains coordinator latch, the successor and previous state
private static final class SuccessorState extends State {
private final CountDownLatch latch = new CountDownLatch(1);
private AbstractProxyTransaction successor;
private State prevState;
SuccessorState() {
super("successor");
}
// Synchronize with succession process and return the successor
AbstractProxyTransaction await() {
try {
latch.await();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for latch of {}", successor);
throw Throwables.propagate(e);
}
return successor;
}
void finish() {
latch.countDown();
}
State getPrevState() {
return prevState;
}
void setPrevState(final State prevState) {
Verify.verify(this.prevState == null);
this.prevState = Preconditions.checkNotNull(prevState);
}
// To be called from safe contexts, where successor is known to be completed
AbstractProxyTransaction getSuccessor() {
return Verify.verifyNotNull(successor);
}
void setSuccessor(final AbstractProxyTransaction successor) {
Verify.verify(this.successor == null);
this.successor = Preconditions.checkNotNull(successor);
}
}
private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
private static final AtomicIntegerFieldUpdater SEALED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractProxyTransaction.class, "sealed");
private static final AtomicReferenceFieldUpdater STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractProxyTransaction.class, State.class, "state");
private static final State OPEN = new State("open");
private static final State SEALED = new State("sealed");
private static final State FLUSHED = new State("flushed");
// Touched from client actor thread only
private final Deque