package org.opendaylight.controller.cluster.databroker.actors.dds;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
+import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
+import org.opendaylight.controller.cluster.access.client.InversibleLockException;
+import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@GuardedBy("this")
private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
- private final Map<Long, AbstractProxyHistory> histories = new ConcurrentHashMap<>();
- private final DistributedDataStoreClientBehavior client;
+ private final Map<Long, ProxyHistory> histories = new ConcurrentHashMap<>();
+ private final AbstractDataStoreClientBehavior client;
private final LocalHistoryIdentifier identifier;
// Used via NEXT_TX_UPDATER
private volatile State state = State.IDLE;
- AbstractClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
+ AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
this.client = Preconditions.checkNotNull(client);
this.identifier = Preconditions.checkNotNull(identifier);
Preconditions.checkArgument(identifier.getCookie() == 0);
final void updateState(final State expected, final State next) {
final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
+ LOG.debug("Client history {} changed state from {} to {}", this, expected, next);
}
@Override
return identifier;
}
- final DistributedDataStoreClientBehavior getClient() {
- return client;
- }
-
final long nextTx() {
return NEXT_TX_UPDATER.getAndIncrement(this);
}
+ final Long resolveShardForPath(final YangInstanceIdentifier path) {
+ return client.resolveShardForPath(path);
+ }
+
@Override
final void localAbort(final Throwable cause) {
final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
}
}
- private AbstractProxyHistory createHistoryProxy(final Long shard) {
- return createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(),
- identifier.getHistoryId(), shard), client.resolver().getFutureBackendInfo(shard));
+ /**
+ * Create a new history proxy for a given shard.
+ *
+ * @throws InversibleLockException if the shard is being reconnected
+ */
+ private ProxyHistory createHistoryProxy(final Long shard) {
+ final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
+ final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(),
+ identifier.getHistoryId(), shard);
+ LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard);
+
+ final ProxyHistory ret = createHistoryProxy(proxyId, connection);
+
+ // Request creation of the history, if it is not the single history
+ if (ret.getIdentifier().getHistoryId() != 0) {
+ connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
+ this::createHistoryCallback);
+ }
+ return ret;
}
- abstract AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
- final Optional<ShardBackendInfo> backendInfo);
+ abstract ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
+ final AbstractClientConnection<ShardBackendInfo> connection);
+
+ private void createHistoryCallback(final Response<?, ?> response) {
+ LOG.debug("Create history response {}", response);
+ }
final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
- final AbstractProxyHistory history = histories.computeIfAbsent(shard, this::createHistoryProxy);
- return history.createTransactionProxy(transactionId);
+ while (true) {
+ final ProxyHistory history;
+ try {
+ history = histories.computeIfAbsent(shard, this::createHistoryProxy);
+ } catch (InversibleLockException e) {
+ LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
+ e.awaitResolution();
+ LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
+ continue;
+ }
+
+ return history.createTransactionProxy(transactionId);
+ }
}
+ /**
+ * Allocate a {@link ClientTransaction}.
+ *
+ * @return A new {@link ClientTransaction}
+ * @throws TransactionChainClosedException if this history is closed
+ */
public final ClientTransaction createTransaction() {
- Preconditions.checkState(state != State.CLOSED);
+ if (state == State.CLOSED) {
+ throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier));
+ }
synchronized (this) {
final ClientTransaction ret = doCreateTransaction();
Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
cohort, txId, previous);
+ LOG.debug("Local history {} readied transaction {}", this, txId);
return cohort;
}
LOG.warn("Could not find completed transaction {}", txId);
}
}
+
+ HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
+ final ProxyHistory oldProxy = histories.get(newConn.cookie());
+ if (oldProxy == null) {
+ return null;
+ }
+
+ final ProxyReconnectCohort proxy = Verify.verifyNotNull(oldProxy.startReconnect(newConn));
+ return new HistoryReconnectCohort() {
+ @Override
+ ProxyReconnectCohort getProxy() {
+ return proxy;
+ }
+
+ @Override
+ void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
+ proxy.replaySuccessfulRequests(previousEntries);
+ }
+
+ @Override
+ public void close() {
+ LOG.debug("Client history {} finishing reconnect to {}", AbstractClientHistory.this, newConn);
+ final ProxyHistory newProxy = proxy.finishReconnect();
+ if (!histories.replace(newConn.cookie(), oldProxy, newProxy)) {
+ LOG.warn("Failed to replace proxy {} with {} in {}", oldProxy, newProxy,
+ AbstractClientHistory.this);
+ }
+ }
+ };
+ }
}