package org.opendaylight.controller.cluster.datastore;
import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
this(transactionIdentifier, DataStoreVersions.CURRENT_VERSION);
}
- protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier,
- short transactionVersion) {
+ protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier, short transactionVersion) {
this.transactionIdentifier = transactionIdentifier;
this.transactionVersion = transactionVersion;
}
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
*/
abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
- private static final MemberName UNKNOWN_MEMBER = MemberName.forName("UNKNOWN-MEMBER");
-
- protected static final AtomicLong TX_COUNTER = new AtomicLong();
+ @SuppressWarnings("rawtypes")
+ private static final AtomicLongFieldUpdater<AbstractTransactionContextFactory> TX_COUNTER_UPDATER =
+ AtomicLongFieldUpdater.newUpdater(AbstractTransactionContextFactory.class, "nextTx");
private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
+ private final LocalHistoryIdentifier historyId;
private final ActorContext actorContext;
- protected AbstractTransactionContextFactory(final ActorContext actorContext) {
+ // Used via TX_COUNTER_UPDATER
+ @SuppressWarnings("unused")
+ private volatile long nextTx;
+
+ protected AbstractTransactionContextFactory(final ActorContext actorContext,
+ final LocalHistoryIdentifier historyId) {
this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.historyId = Preconditions.checkNotNull(historyId);
}
final ActorContext getActorContext() {
return actorContext;
}
+ final LocalHistoryIdentifier getHistoryId() {
+ return historyId;
+ }
+
private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent, final String shardName) {
final LocalTransactionFactory local = knownLocal.get(shardName);
if (local != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} - Creating local component for shard {} using factory {}",
- parent.getIdentifier(), shardName, local);
- }
+ LOG.debug("Tx {} - Creating local component for shard {} using factory {}", parent.getIdentifier(),
+ shardName, local);
try {
return createLocalTransactionContext(local, parent);
}
}
- protected MemberName getMemberName() {
- final MemberName ret = getActorContext().getCurrentMemberName();
- return ret == null ? UNKNOWN_MEMBER : ret;
+ protected final MemberName getMemberName() {
+ return historyId.getClientId().getFrontendId().getMemberName();
}
/**
* factory.
* @return Transaction identifier, may not be null.
*/
- protected abstract TransactionIdentifier nextIdentifier();
+ protected final TransactionIdentifier nextIdentifier() {
+ return new TransactionIdentifier(historyId, TX_COUNTER_UPDATER.getAndIncrement(this));
+ }
/**
* Find the primary shard actor.
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
this.waitTillReadyTimeInMillis =
actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
- this.txContextFactory = TransactionContextFactory.create(actorContext);
+ this.txContextFactory = new TransactionContextFactory(actorContext, identifier);
datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(
datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
this.client = null;
this.identifier = Preconditions.checkNotNull(identifier);
- this.txContextFactory = TransactionContextFactory.create(actorContext);
+ this.txContextFactory = new TransactionContextFactory(actorContext, identifier);
this.waitTillReadyTimeInMillis =
actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
}
import akka.dispatch.OnComplete;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.TransactionIdentifierUtils;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
return Futures.failed(operationError);
}
- final ReadyLocalTransaction message = new ReadyLocalTransaction(transaction.getIdentifier().toString(),
- modification, immediate);
+ final ReadyLocalTransaction message = new ReadyLocalTransaction(
+ TransactionIdentifierUtils.actorNameFor(transaction.getIdentifier()), modification, immediate);
return actorContext.executeOperationAsync(leader, message, actorContext.getTransactionCommitOperationTimeout());
}
import com.google.common.base.Preconditions;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.sal.core.spi.data.AbstractSnapshotBackedTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
}
@Override
- protected DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final DataTreeModification modification) {
+ protected DOMStoreThreePhaseCommitCohort createCohort(
+ final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction,
+ final DataTreeModification modification) {
return new LocalChainThreePhaseCommitCohort(transaction, modification);
}
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
*/
package org.opendaylight.controller.cluster.datastore;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import com.google.common.base.Preconditions;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import akka.actor.ActorSelection;
import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
import com.google.common.base.Preconditions;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.TransactionIdentifierUtils;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
private BatchedModifications newBatchedModifications() {
- return new BatchedModifications(getIdentifier().toString(), getTransactionVersion(),
- getIdentifier().getChainId());
+ return new BatchedModifications(TransactionIdentifierUtils.actorNameFor(getIdentifier()),
+ getTransactionVersion(), RemoteTransactionContextSupport.compatTransactionChainId(getIdentifier()));
}
private void batchModification(Modification modification) {
import akka.util.Timeout;
import com.google.common.base.Preconditions;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.TransactionIdentifierUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
}
}
+ /**
+ * @deprecated Temporary utility for extracting transaction chain ID from a {@link TransactionIdentifier}
+ */
+ @Deprecated
+ static String compatTransactionChainId(final TransactionIdentifier txId) {
+ final long historyId = txId.getHistoryId().getHistoryId();
+ return historyId == 0 ? "" : Long.toUnsignedString(historyId);
+ }
+
/**
* Performs a CreateTransaction try async.
*/
primaryShardInfo.getPrimaryShardActor());
}
- Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
- getTransactionType().ordinal(), getIdentifier().getChainId(),
+ Object serializedCreateMessage = new CreateTransaction(TransactionIdentifierUtils.actorNameFor(getIdentifier()),
+ getTransactionType().ordinal(), compatTransactionChainId(getIdentifier()),
primaryShardInfo.getPrimaryShardVersion()).toSerializable();
Future<Object> createTxFuture = getActorContext().executeOperationAsync(
package org.opendaylight.controller.cluster.datastore;
import akka.dispatch.OnComplete;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Arrays;
import java.util.List;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.slf4j.Logger;
private final ActorContext actorContext;
private final Future<Object> cohortFuture;
- private final String transactionId;
+ private final TransactionIdentifier transactionId;
private volatile DOMStoreThreePhaseCommitCohort delegateCohort = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
private final OperationCallback.Reference operationCallbackRef;
- SingleCommitCohortProxy(ActorContext actorContext, Future<Object> cohortFuture, String transactionId,
+ SingleCommitCohortProxy(ActorContext actorContext, Future<Object> cohortFuture, TransactionIdentifier transactionId,
OperationCallback.Reference operationCallbackRef) {
this.actorContext = actorContext;
this.cohortFuture = cohortFuture;
- this.transactionId = transactionId;
+ this.transactionId = Preconditions.checkNotNull(transactionId);
this.operationCallbackRef = operationCallbackRef;
}
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionChainIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
};
private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class);
- private static final AtomicInteger CHAIN_COUNTER = new AtomicInteger();
private static final AtomicReferenceFieldUpdater<TransactionChainProxy, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "currentState");
- private final TransactionChainIdentifier transactionChainId;
private final TransactionContextFactory parent;
private volatile State currentState = IDLE_STATE;
*/
private final ConcurrentMap<TransactionIdentifier, Promise<Object>> priorReadOnlyTxPromises = new ConcurrentHashMap<>();
- TransactionChainProxy(final TransactionContextFactory parent) {
- super(parent.getActorContext());
-
- transactionChainId = new TransactionChainIdentifier(parent.getActorContext().getCurrentMemberName(), CHAIN_COUNTER.incrementAndGet());
+ TransactionChainProxy(final TransactionContextFactory parent, final LocalHistoryIdentifier historyId) {
+ super(parent.getActorContext(), historyId);
this.parent = parent;
}
public String getTransactionChainId() {
- return transactionChainId.toString();
+ return getHistoryId().toString();
}
@Override
getActorContext().broadcast(new Function<Short, Object>() {
@Override
public Object apply(Short version) {
- return new CloseTransactionChain(transactionChainId.toString(), version).toSerializable();
+ return new CloseTransactionChain(getHistoryId().toString(), version).toSerializable();
}
});
}
promise.success(null);
}
}
-
- @Override
- protected TransactionIdentifier nextIdentifier() {
- return transactionChainId.newTransactionIdentifier();
- }
}
import akka.actor.ActorSelection;
import java.util.Collection;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
* transactions (ie not chained).
*/
final class TransactionContextFactory extends AbstractTransactionContextFactory<LocalTransactionFactoryImpl> {
+ private final AtomicLong nextHistory = new AtomicLong(1);
- private TransactionContextFactory(final ActorContext actorContext) {
- super(actorContext);
- }
-
- static TransactionContextFactory create(final ActorContext actorContext) {
- return new TransactionContextFactory(actorContext);
+ TransactionContextFactory(final ActorContext actorContext, final ClientIdentifier clientId) {
+ super(actorContext, new LocalHistoryIdentifier(clientId, 0));
}
@Override
public void close() {
}
- @Override
- protected TransactionIdentifier nextIdentifier() {
- return TransactionIdentifier.create(getMemberName(), TX_COUNTER.getAndIncrement());
- }
-
@Override
protected LocalTransactionFactoryImpl factoryForShard(final String shardName, final ActorSelection shardLeader, final DataTree dataTree) {
return new LocalTransactionFactoryImpl(getActorContext(), shardLeader, dataTree);
}
DOMStoreTransactionChain createTransactionChain() {
- return new TransactionChainProxy(this);
+ return new TransactionChainProxy(this, new LocalHistoryIdentifier(getHistoryId().getClientId(),
+ nextHistory.getAndIncrement()));
}
@Override
- protected void onTransactionContextCreated(TransactionIdentifier transactionId) {
+ protected void onTransactionContextCreated(final TransactionIdentifier transactionId) {
}
}
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final OperationLimiter limiter;
- TransactionContextWrapper(TransactionIdentifier identifier, final ActorContext actorContext) {
+ TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext) {
this.identifier = Preconditions.checkNotNull(identifier);
this.limiter = new OperationLimiter(identifier,
actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1, // 1 extra permit for the ready operation
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
+import org.opendaylight.controller.cluster.datastore.utils.TransactionIdentifierUtils;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
future = getDirectCommitFuture(transactionContext, operationCallbackRef);
}
- return new SingleCommitCohortProxy(txContextFactory.getActorContext(), future, getIdentifier().toString(),
- operationCallbackRef);
+ return new SingleCommitCohortProxy(txContextFactory.getActorContext(), future, getIdentifier(),
+ operationCallbackRef);
}
private Future<?> getDirectCommitFuture(TransactionContext transactionContext,
}
return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohorts,
- getIdentifier().toString());
+ TransactionIdentifierUtils.actorNameFor(getIdentifier()));
}
private String shardNameFromIdentifier(final YangInstanceIdentifier path) {
import akka.actor.ActorSelection;
import akka.dispatch.Mapper;
import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.slf4j.Logger;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.Status.Failure;
+import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.utils.TransactionIdentifierUtils;
import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;
return "entityCommitRetry";
}
};
+ private static final FrontendType FRONTEND_TYPE = FrontendType.forName("entity-ownership-internal");
- private final Logger log;
- private int transactionIDCounter = 0;
- private final MemberName localMemberName;
private final Queue<Modification> pendingModifications = new LinkedList<>();
+ private final LocalHistoryIdentifier historyId;
+ private final Logger log;
+
private BatchedModifications inflightCommit;
private Cancellable retryCommitSchedule;
+ private long transactionIDCounter = 0;
EntityOwnershipShardCommitCoordinator(MemberName localMemberName, Logger log) {
- this.localMemberName = localMemberName;
- this.log = log;
+ this.log = Preconditions.checkNotNull(log);
+ historyId = new LocalHistoryIdentifier(
+ ClientIdentifier.create(FrontendIdentifier.create(localMemberName, FRONTEND_TYPE), 0), 0);
}
boolean handleMessage(Object message, EntityOwnershipShard shard) {
}
BatchedModifications newBatchedModifications() {
- BatchedModifications modifications = new BatchedModifications(
- TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
- DataStoreVersions.CURRENT_VERSION, "");
+ BatchedModifications modifications = new BatchedModifications(TransactionIdentifierUtils.actorNameFor(
+ new TransactionIdentifier(historyId, ++transactionIDCounter)), DataStoreVersions.CURRENT_VERSION, "");
modifications.setDoCommitOnReady(true);
modifications.setReady(true);
modifications.setTotalMessagesSent(1);
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.identifiers;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-
-/**
- * A TransactionIdentifier which is tied to a backend transaction chain.
- */
-public class ChainedTransactionIdentifier extends TransactionIdentifier {
- private final String chainId;
- private Supplier<String> stringRepresentation;
-
- public ChainedTransactionIdentifier(final TransactionChainIdentifier chainId, final long txnCounter) {
- super(chainId.getMemberName(), txnCounter);
- Preconditions.checkNotNull(chainId);
- this.chainId = chainId.toString();
- stringRepresentation = Suppliers.memoize(new Supplier<String>() {
- @Override
- public String get() {
- return new StringBuilder(chainId.toString().length() + TX_SEPARATOR.length() + 21).
- append(chainId).append(TX_SEPARATOR).append(getCounter()).append('-').
- append(getTimestamp()).toString();
- }
- });
- }
-
-
- @Override
- public String getChainId() {
- return chainId;
- }
-
- @Override
- public String toString() {
- return stringRepresentation.get();
- }
-
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.identifiers;
-
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import java.util.concurrent.atomic.AtomicLong;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-
-public class TransactionChainIdentifier {
-
- protected static final String CHAIN_SEPARATOR = "-chn-";
-
- private final AtomicLong txnCounter = new AtomicLong();
- private final Supplier<String> stringRepresentation;
- private final MemberName memberName;
-
- public TransactionChainIdentifier(final MemberName memberName, final long counter) {
- this.memberName = memberName;
- stringRepresentation = Suppliers.memoize(() -> {
- final StringBuilder sb = new StringBuilder();
- sb.append(memberName.getName()).append(CHAIN_SEPARATOR);
- sb.append(counter);
- return sb.toString();
- });
- }
- @Override
- public String toString() {
- return stringRepresentation.get();
- }
-
- public TransactionIdentifier newTransactionIdentifier(){
- return new ChainedTransactionIdentifier(this, txnCounter.incrementAndGet());
- }
-
- public MemberName getMemberName() {
- return memberName;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.identifiers;
-
-import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-
-public class TransactionIdentifier {
- protected static final String TX_SEPARATOR = "-txn-";
-
- private final MemberName memberName;
- private final long counter;
- private final long timestamp;
- private String stringRepresentation;
-
- public TransactionIdentifier(MemberName memberName, long counter) {
- this.memberName = Preconditions.checkNotNull(memberName, "memberName should not be null");
- this.counter = counter;
- this.timestamp = System.currentTimeMillis();
- }
-
- public String getChainId() {
- return "";
- }
-
- protected MemberName getMemberName() {
- return memberName;
- }
-
- protected long getCounter() {
- return counter;
- }
-
- protected long getTimestamp() {
- return timestamp;
- }
-
- public static TransactionIdentifier create(MemberName memberName, long counter) {
- return new TransactionIdentifier(memberName, counter);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- TransactionIdentifier that = (TransactionIdentifier) o;
-
- if (counter != that.counter) {
- return false;
- }
-
- if (timestamp != that.timestamp) {
- return false;
- }
-
- if (!memberName.equals(that.memberName)) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = memberName.hashCode();
- result = 31 * result + (int) (counter ^ (counter >>> 32));
- result = 31 * result + (int)(timestamp ^ (timestamp >>> 32));
- return result;
- }
-
-
- @Override
- public String toString() {
- if(stringRepresentation == null) {
- stringRepresentation = new StringBuilder(memberName.getName().length() + TX_SEPARATOR.length() + 21).
- append(memberName.getName()).append(TX_SEPARATOR).append(counter).append('-').append(timestamp).toString();
- }
-
- return stringRepresentation;
- }
-
-}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+public final class TransactionIdentifierUtils {
+ private TransactionIdentifierUtils() {
+ throw new UnsupportedOperationException();
+ }
+
+ public static String actorNameFor(final TransactionIdentifier txId) {
+ final LocalHistoryIdentifier historyId = txId.getHistoryId();
+ final ClientIdentifier clientId = historyId.getClientId();
+ final FrontendIdentifier frontendId = clientId.getFrontendId();
+
+ final StringBuilder sb = new StringBuilder();
+ sb.append(frontendId.getMemberName().getName()).append(':');
+ sb.append(frontendId.getClientType().getName()).append('@');
+ sb.append(clientId.getGeneration()).append(':');
+ if (historyId.getHistoryId() != 0) {
+ sb.append(historyId.getHistoryId()).append('-');
+ }
+
+ return sb.append(txId.getTransactionId()).toString();
+ }
+}
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException;
doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
- mockComponentFactory = TransactionContextFactory.create(mockActorContext);
+ final ClientIdentifier mockClientId = MockIdentifiers.clientIdentifier(getClass(), memberName);
+ mockComponentFactory = new TransactionContextFactory(mockActorContext, mockClientId);
Timer timer = new MetricRegistry().timer("test");
doReturn(timer).when(mockActorContext).getOperationTimer(any(String.class));
public boolean matches(Object argument) {
if(CreateTransaction.class.equals(argument.getClass())) {
CreateTransaction obj = CreateTransaction.fromSerializable(argument);
- return obj.getTransactionId().startsWith(memberName) &&
+ return obj.getTransactionId().startsWith(memberName + ':') &&
obj.getTransactionType() == type.ordinal();
}
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.slf4j.Logger;
import scala.concurrent.Future;
* @author Thomas Pantelis
*/
public class DebugThreePhaseCommitCohortTest {
+ private final TransactionIdentifier transactionId = MockIdentifiers.transactionIdentifier(
+ DebugThreePhaseCommitCohortTest.class, "mock");
@Test
public void test() {
List<Future<Object>> expCohortFutures = new ArrayList<>();
doReturn(expCohortFutures).when(mockDelegate).getCohortFutures();
- TransactionIdentifier transactionId = TransactionIdentifier.create(MemberName.forName("1"), 1);
Throwable debugContext = new RuntimeException("mock");
- DebugThreePhaseCommitCohort cohort = new DebugThreePhaseCommitCohort(transactionId , mockDelegate , debugContext );
+ DebugThreePhaseCommitCohort cohort = new DebugThreePhaseCommitCohort(transactionId , mockDelegate , debugContext);
Logger mockLogger = mock(Logger.class);
cohort.setLogger(mockLogger);
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
public class LocalTransactionContextTest {
@Mock
- OperationLimiter limiter;
+ private OperationLimiter limiter;
@Mock
- TransactionIdentifier identifier;
+ private DOMStoreReadWriteTransaction readWriteTransaction;
@Mock
- DOMStoreReadWriteTransaction readWriteTransaction;
+ private LocalTransactionReadySupport mockReadySupport;
- @Mock
- LocalTransactionReadySupport mockReadySupport;
-
- LocalTransactionContext localTransactionContext;
+ private LocalTransactionContext localTransactionContext;
@Before
public void setUp() {
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+public final class MockIdentifiers {
+ public static ClientIdentifier clientIdentifier(final Class<?> clazz, final String memberName) {
+ return ClientIdentifier.create(FrontendIdentifier.create(MemberName.forName(memberName),
+ FrontendType.forName(clazz.getSimpleName())), 0);
+ }
+
+ public static LocalHistoryIdentifier historyIdentifier(final Class<?> clazz, final String memberName) {
+ return new LocalHistoryIdentifier(clientIdentifier(clazz, memberName), 0);
+ }
+
+ public static TransactionIdentifier transactionIdentifier(final Class<?> clazz, final String memberName) {
+ return new TransactionIdentifier(historyIdentifier(clazz, memberName), 0);
+ };
+}
import static org.junit.Assert.assertEquals;
import org.junit.Test;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
* @author Thomas Pantelis
*/
public class OperationLimiterTest {
+ private final TransactionIdentifier transactionId = MockIdentifiers.transactionIdentifier(
+ OperationLimiterTest.class, "mock");
@Test
public void testOnComplete() throws Exception {
int permits = 10;
- OperationLimiter limiter = new OperationLimiter(new TransactionIdentifier(MemberName.forName("foo"), 1), permits, 1);
+ OperationLimiter limiter = new OperationLimiter(transactionId, permits, 1);
limiter.acquire(permits);
int availablePermits = 0;
import java.util.function.Function;
import org.junit.Assert;
import org.junit.Test;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import scala.concurrent.Promise;
public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
+ private LocalHistoryIdentifier historyId;
+
+ @Override
+ public void setUp() {
+ super.setUp();
+ historyId = MockIdentifiers.historyIdentifier(TransactionChainProxyTest.class, memberName);
+ }
@SuppressWarnings("resource")
@Test
- public void testNewReadOnlyTransaction() throws Exception {
+ public void testNewReadOnlyTransaction() {
- DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadOnlyTransaction();
+ DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newReadOnlyTransaction();
Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
}
@SuppressWarnings("resource")
@Test
- public void testNewReadWriteTransaction() throws Exception {
- DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadWriteTransaction();
+ public void testNewReadWriteTransaction() {
+ DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newReadWriteTransaction();
Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
}
@SuppressWarnings("resource")
@Test
- public void testNewWriteOnlyTransaction() throws Exception {
- DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newWriteOnlyTransaction();
+ public void testNewWriteOnlyTransaction() {
+ DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newWriteOnlyTransaction();
Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
}
@Test
- public void testClose() throws Exception {
- new TransactionChainProxy(mockComponentFactory).close();
+ public void testClose() {
+ new TransactionChainProxy(mockComponentFactory, historyId).close();
verify(mockActorContext, times(1)).broadcast(any(Function.class));
}
- @Test
- public void testTransactionChainsHaveUniqueId() {
- try (TransactionChainProxy one = new TransactionChainProxy(mockComponentFactory)) {
- try (TransactionChainProxy two = new TransactionChainProxy(mockComponentFactory)) {
-
- Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
- }
- }
- }
-
@Test
public void testRateLimitingUsedInReadWriteTxCreation() {
- try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
+ try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
txChainProxy.newReadWriteTransaction();
@Test
public void testRateLimitingUsedInWriteOnlyTxCreation() {
- try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
+ try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
txChainProxy.newWriteOnlyTransaction();
@Test
public void testRateLimitingNotUsedInReadOnlyTxCreation() {
- try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
+ try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
txChainProxy.newReadOnlyTransaction();
public void testChainedWriteOnlyTransactions() throws Exception {
dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
- try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
+ try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
*/
@Test
public void testChainedReadWriteTransactions() throws Exception {
- try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
+ try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
}
@Test(expected = IllegalStateException.class)
- public void testChainedWriteTransactionsWithPreviousTxNotReady() throws Exception {
+ public void testChainedWriteTransactionsWithPreviousTxNotReady() {
ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
expectBatchedModifications(actorRef, 1);
- try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory)) {
+ try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
public class TransactionContextWrapperTest {
-
- @Mock
- TransactionIdentifier identifier;
-
@Mock
- ActorContext actorContext;
+ private ActorContext actorContext;
@Mock
- TransactionContext transactionContext;
+ private TransactionContext transactionContext;
- TransactionContextWrapper transactionContextWrapper;
+ private TransactionContextWrapper transactionContextWrapper;
@Before
public void setUp(){
MockitoAnnotations.initMocks(this);
doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
- transactionContextWrapper = new TransactionContextWrapper(identifier, actorContext);
+ transactionContextWrapper = new TransactionContextWrapper(MockIdentifiers.transactionIdentifier(
+ TransactionContextWrapperTest.class, "mock"), actorContext);
}
@Test
Object id = transactionProxy.getIdentifier();
assertNotNull("getIdentifier returned null", id);
- assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName));
+ assertTrue("Invalid identifier: " + id, id.toString().contains(MemberName.forName(memberName).toString()));
}
@Test
public boolean matches(Object argument) {
if(ShardTransactionMessages.CreateTransaction.class.equals(argument.getClass())) {
CreateTransaction obj = CreateTransaction.fromSerializable(argument);
- return obj.getTransactionId().startsWith(memberName) &&
+ return obj.getTransactionId().startsWith(memberName + ':') &&
obj.getTransactionType() == type.ordinal();
}
}
@Test
- public void testClose() throws Exception{
+ public void testClose() {
ActorRef actorRef = setupPreBoronActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
expectBatchedModifications(actorRef, 1);
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.identifiers;
-
-import static org.hamcrest.CoreMatchers.startsWith;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-
-public class ChainedTransactionIdentifierTest {
-
- @Test
- public void testToString(){
- TransactionChainIdentifier chainId = new TransactionChainIdentifier(MemberName.forName("member-1"), 99);
- ChainedTransactionIdentifier chainedTransactionIdentifier = new ChainedTransactionIdentifier(chainId, 100);
-
- String txnId = chainedTransactionIdentifier.toString();
-
- assertTrue(txnId.contains("member-1"));
- assertTrue(txnId.contains("100"));
- assertTrue(txnId.contains("99"));
-
- assertThat(txnId, startsWith("member-1-chn-99-txn-100-"));
- }
-
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.identifiers;
-
-import static org.hamcrest.CoreMatchers.startsWith;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-
-public class TransactionChainIdentifierTest {
- private static final MemberName MEMBER_1 = MemberName.forName("member-1");
-
- @Test
- public void testToString(){
- TransactionChainIdentifier transactionChainIdentifier = new TransactionChainIdentifier(MEMBER_1, 99);
-
- String id = transactionChainIdentifier.toString();
-
- assertEquals("member-1-chn-99", id);
- }
-
- @Test
- public void testNewTransactionIdentifier(){
- TransactionChainIdentifier transactionChainIdentifier = new TransactionChainIdentifier(MEMBER_1, 99);
-
- TransactionIdentifier txId1 = transactionChainIdentifier.newTransactionIdentifier();
-
- assertThat(txId1.toString(), startsWith("member-1-chn-99-txn-1-"));
-
- TransactionIdentifier txId2 = transactionChainIdentifier.newTransactionIdentifier();
-
- assertThat(txId2.toString(), startsWith("member-1-chn-99-txn-2-"));
- }
-
-}
\ No newline at end of file