*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.UnsignedLong;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiConsumer;
import java.util.function.Consumer;
-import javax.annotation.concurrent.GuardedBy;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
+import org.opendaylight.controller.cluster.access.commands.SkipTransactionsRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*/
abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
private abstract static class AbstractLocal extends ProxyHistory {
- private final DataTree dataTree;
+ private final ReadOnlyDataTree dataTree;
AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
- final LocalHistoryIdentifier identifier, final DataTree dataTree) {
+ final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
super(parent, connection, identifier);
- this.dataTree = Preconditions.checkNotNull(dataTree);
+ this.dataTree = requireNonNull(dataTree);
}
final DataTreeSnapshot takeSnapshot() {
private volatile LocalReadWriteProxyTransaction lastSealed;
Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
- final LocalHistoryIdentifier identifier, final DataTree dataTree) {
+ final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
super(parent, connection, identifier, dataTree);
}
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId, final boolean snapshotOnly) {
- Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
+ final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
+ checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
+
+ if (isDone) {
+ // Done transactions do not register on our radar on should not have any state associated.
+ return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
+ : new LocalReadWriteProxyTransaction(this, txId);
+ }
// onTransactionCompleted() runs concurrently
final LocalReadWriteProxyTransaction localSealed = lastSealed;
@Override
void onTransactionCompleted(final AbstractProxyTransaction tx) {
- Verify.verify(tx instanceof LocalProxyTransaction);
- if (tx instanceof LocalReadWriteProxyTransaction) {
- if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
- LOG.debug("Completed last sealed transaction {}", tx);
- }
+ verify(tx instanceof LocalProxyTransaction, "Unexpected transaction %s", tx);
+ if (tx instanceof LocalReadWriteProxyTransaction
+ && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
+ LOG.debug("Completed last sealed transaction {}", tx);
}
}
@Override
void onTransactionSealed(final AbstractProxyTransaction tx) {
- Preconditions.checkState(tx.equals(lastOpen));
+ checkState(tx.equals(lastOpen));
lastSealed = lastOpen;
lastOpen = null;
}
private static final class LocalSingle extends AbstractLocal {
LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
- final LocalHistoryIdentifier identifier, final DataTree dataTree) {
+ final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
super(parent, connection, identifier, dataTree);
}
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId, final boolean snapshotOnly) {
+ final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
final DataTreeSnapshot snapshot = takeSnapshot();
return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
new LocalReadWriteProxyTransaction(this, txId, snapshot);
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId, final boolean snapshotOnly) {
- return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
+ final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
+ return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
}
@Override
@Override
AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
- final TransactionIdentifier txId, final boolean snapshotOnly) {
- return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
+ final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
+ return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
}
@Override
return identifier;
}
- @GuardedBy("lock")
+ @Holding("lock")
@Override
- void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
+ void replayRequests(final Collection<ConnectionEntry> previousEntries) {
// First look for our Create message
- for (ConnectionEntry e : previousEntries) {
+ Iterator<ConnectionEntry> it = previousEntries.iterator();
+ while (it.hasNext()) {
+ final ConnectionEntry e = it.next();
final Request<?, ?> req = e.getRequest();
if (identifier.equals(req.getTarget())) {
- Verify.verify(req instanceof LocalHistoryRequest);
+ verify(req instanceof LocalHistoryRequest, "Unexpected request %s", req);
if (req instanceof CreateLocalHistoryRequest) {
- successor.connection.sendRequest(req, e.getCallback());
+ successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
+ it.remove();
break;
}
}
}
for (AbstractProxyTransaction t : proxies.values()) {
- LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
- final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
- t.isSnapshotOnly());
- LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
- t.replayMessages(newProxy, previousEntries);
+ LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
+ t.replayMessages(successor, previousEntries);
+ }
+
+ // Forward any skipped transactions
+ final var local = skippedTransactions;
+ if (local != null) {
+ LOG.debug("{} forwarding skipped transactions towards successor {}", identifier, successor);
+ successor.skipTransactions(local);
+ skippedTransactions = null;
}
// Now look for any finalizing messages
- for (ConnectionEntry e : previousEntries) {
+ it = previousEntries.iterator();
+ while (it.hasNext()) {
+ final ConnectionEntry e = it.next();
final Request<?, ?> req = e.getRequest();
if (identifier.equals(req.getTarget())) {
- Verify.verify(req instanceof LocalHistoryRequest);
- successor.connection.sendRequest(req, e.getCallback());
+ verify(req instanceof LocalHistoryRequest, "Unexpected request %s", req);
+ if (req instanceof DestroyLocalHistoryRequest) {
+ successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
+ it.remove();
+ break;
+ }
}
}
}
- @GuardedBy("lock")
+ @Holding("lock")
@Override
ProxyHistory finishReconnect() {
- final ProxyHistory ret = Verify.verifyNotNull(successor);
+ final ProxyHistory ret = verifyNotNull(successor);
for (AbstractProxyTransaction t : proxies.values()) {
t.finishReconnect();
}
@Override
- void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
- final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
+ void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
+ throws RequestException {
+ final Request<?, ?> request = entry.getRequest();
if (request instanceof TransactionRequest) {
- forwardTransactionRequest((TransactionRequest<?>) request, callback);
+ lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
+ entry.getEnqueuedTicks());
} else if (request instanceof LocalHistoryRequest) {
- forwardTo.accept(request, callback);
+ replayTo.accept(entry);
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
}
- private void forwardTransactionRequest(final TransactionRequest<?> request,
- final Consumer<Response<?, ?>> callback) throws RequestException {
+ @Override
+ void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
+ throws RequestException {
+ final Request<?, ?> request = entry.getRequest();
+ if (request instanceof TransactionRequest) {
+ lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
+ } else if (request instanceof LocalHistoryRequest) {
+ forwardTo.accept(entry);
+ } else {
+ throw new IllegalArgumentException("Unhandled request " + request);
+ }
+ }
+ private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
+ throws RequestReplayException {
final AbstractProxyTransaction proxy;
lock.lock();
try {
} finally {
lock.unlock();
}
- if (proxy == null) {
- throw new RequestReplayException("Failed to find proxy for %s", request);
+ if (proxy != null) {
+ return proxy;
}
- proxy.forwardRequest(request, callback);
+ throw new RequestReplayException("Failed to find proxy for %s", request);
}
}
private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
private final Lock lock = new ReentrantLock();
- private final LocalHistoryIdentifier identifier;
- private final AbstractClientConnection<ShardBackendInfo> connection;
- private final AbstractClientHistory parent;
+ private final @NonNull LocalHistoryIdentifier identifier;
+ private final @NonNull AbstractClientConnection<ShardBackendInfo> connection;
+ private final @NonNull AbstractClientHistory parent;
@GuardedBy("lock")
private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
@GuardedBy("lock")
private ProxyHistory successor;
+ // List of transaction identifiers which were allocated by our parent history, but did not touch our shard. Each of
+ // these represents a hole in otherwise-contiguous allocation of transactionIds. These holes are problematic, as
+ // each of them prevents LeaderFrontendState.purgedHistories from coalescing, leading to a gradual heap exhaustion.
+ //
+ // <p>
+ // We keep these in an ArrayList for fast insertion, as that happens when we are otherwise idle. We translate these
+ // into purge requests when:
+ // - we are about to allocate a new transaction
+ // - we get a successor proxy
+ // - the list grows unreasonably long
+ //
+ // TODO: we are tracking entire TransactionIdentifiers, but really only need to track the longs. Do that once we
+ // have a {@code List<long>}.
+ // FIXME: this is not tuneable, but perhaps should be
+ // FIXME: default value deserves some explanation -- this affects depth of an RB Tree on the receiving end.
+ private static final int PURGE_SKIPPED_TXID_THRESHOLD = 256;
+
+ @GuardedBy("lock")
+ private volatile List<TransactionIdentifier> skippedTransactions;
+
private ProxyHistory(final AbstractClientHistory parent,
final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
- this.parent = Preconditions.checkNotNull(parent);
- this.connection = Preconditions.checkNotNull(connection);
- this.identifier = Preconditions.checkNotNull(identifier);
+ this.parent = requireNonNull(parent);
+ this.connection = requireNonNull(connection);
+ this.identifier = requireNonNull(identifier);
}
static ProxyHistory createClient(final AbstractClientHistory parent,
final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
- final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
+ final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
: new Remote(parent, connection, identifier);
}
static ProxyHistory createSingle(final AbstractClientHistory parent,
final AbstractClientConnection<ShardBackendInfo> connection,
final LocalHistoryIdentifier identifier) {
- final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
+ final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
: new RemoteSingle(parent, connection, identifier);
}
@Override
+ // Non-final for mocking
public LocalHistoryIdentifier getIdentifier() {
return identifier;
}
+ final ClientActorContext context() {
+ return connection.context();
+ }
+
+ final long currentTime() {
+ return connection.currentTime();
+ }
+
final ActorRef localActor() {
return connection.localActor();
}
final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
final boolean snapshotOnly) {
+ return createTransactionProxy(txId, snapshotOnly, false);
+ }
+
+ // Non-final for mocking
+ AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
+ final boolean isDone) {
lock.lock();
try {
if (successor != null) {
- return successor.createTransactionProxy(txId, snapshotOnly);
+ return successor.createTransactionProxy(txId, snapshotOnly, isDone);
}
final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
- final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
+ final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
proxies.put(proxyId, ret);
LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
return ret;
}
}
+ final void skipTransaction(final TransactionIdentifier txId) {
+ lock.lock();
+ try {
+ if (successor != null) {
+ successor.skipTransaction(txId);
+ return;
+ }
+
+ var local = skippedTransactions;
+ if (local == null) {
+ skippedTransactions = local = new ArrayList<>();
+ }
+ local.add(txId);
+ LOG.debug("Recorded skipped transaction {}", txId);
+ skipIfNeeded(local);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Holding("lock")
+ private void skipIfNeeded(final List<TransactionIdentifier> current) {
+ if (current.size() >= PURGE_SKIPPED_TXID_THRESHOLD) {
+ skippedTransactions = null;
+ doSkipTransactions(current);
+ }
+ }
+
+ private void skipTransactions(final List<TransactionIdentifier> toSkip) {
+ lock.lock();
+ try {
+ if (successor != null) {
+ successor.skipTransactions(toSkip);
+ return;
+ }
+
+ var local = skippedTransactions;
+ if (local != null) {
+ local.addAll(toSkip);
+ } else {
+ skippedTransactions = local = toSkip;
+ }
+ skipIfNeeded(local);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void skipTransactions() {
+ var local = skippedTransactions;
+ if (local != null) {
+ lock.lock();
+ try {
+ local = skippedTransactions;
+ if (local != null && successor == null) {
+ skippedTransactions = null;
+ doSkipTransactions(local);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ @Holding("lock")
+ private void doSkipTransactions(final List<TransactionIdentifier> toSkip) {
+ final var txIds = toSkip.stream()
+ .mapToLong(TransactionIdentifier::getTransactionId)
+ .distinct()
+ .sorted()
+ .mapToObj(UnsignedLong::fromLongBits)
+ .collect(ImmutableList.toImmutableList());
+
+ LOG.debug("Proxy {} skipping transactions {}", this, txIds);
+ connection.enqueueRequest(new SkipTransactionsRequest(new TransactionIdentifier(identifier,
+ txIds.get(0).longValue()), 0, localActor(),txIds.subList(1, txIds.size())), resp -> {
+ LOG.debug("Proxy {} confirmed transaction skip", this);
+ }, connection.currentTime());
+ }
+
final void abortTransaction(final AbstractProxyTransaction tx) {
lock.lock();
try {
- proxies.remove(tx.getIdentifier());
- LOG.debug("Proxy {} aborting transaction {}", this, tx);
+ // Removal will be completed once purge completes
+ LOG.debug("Proxy {} aborted transaction {}", this, tx);
onTransactionAborted(tx);
} finally {
lock.unlock();
final void completeTransaction(final AbstractProxyTransaction tx) {
lock.lock();
try {
- proxies.remove(tx.getIdentifier());
+ // Removal will be completed once purge completes
LOG.debug("Proxy {} completing transaction {}", this, tx);
onTransactionCompleted(tx);
} finally {
}
}
+ final void purgeTransaction(final AbstractProxyTransaction tx) {
+ lock.lock();
+ try {
+ proxies.remove(tx.getIdentifier());
+ LOG.debug("Proxy {} purged transaction {}", this, tx);
+ } finally {
+ lock.unlock();
+ }
+ }
+
final void close() {
lock.lock();
try {
}
}
+ final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
+ final long enqueuedTicks) {
+ skipTransactions();
+ connection.enqueueRequest(request, callback, enqueuedTicks);
+ }
+
final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ skipTransactions();
connection.sendRequest(request, callback);
}
- @GuardedBy("lock")
+ @Holding("lock")
+ @SuppressWarnings("checkstyle:hiddenField")
abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
- TransactionIdentifier txId, boolean snapshotOnly);
+ TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
+ @Holding("lock")
+ @SuppressWarnings("checkstyle:hiddenField")
abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
@SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
- ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
+ final ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
lock.lock();
if (successor != null) {
lock.unlock();
LOG.debug("Proxy {} purge completed with {}", this, response);
}
- @GuardedBy("lock")
+ @Holding("lock")
void onTransactionAborted(final AbstractProxyTransaction tx) {
// No-op for most implementations
}
- @GuardedBy("lock")
+ @Holding("lock")
void onTransactionCompleted(final AbstractProxyTransaction tx) {
// No-op for most implementations
}
+ @Holding("lock")
void onTransactionSealed(final AbstractProxyTransaction tx) {
// No-op on most implementations
}