import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import javax.annotation.Nonnull;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionChainIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
};
private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class);
- private static final AtomicInteger CHAIN_COUNTER = new AtomicInteger();
private static final AtomicReferenceFieldUpdater<TransactionChainProxy, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "currentState");
- private final TransactionChainIdentifier transactionChainId;
private final TransactionContextFactory parent;
private volatile State currentState = IDLE_STATE;
*/
private final ConcurrentMap<TransactionIdentifier, Promise<Object>> priorReadOnlyTxPromises = new ConcurrentHashMap<>();
- TransactionChainProxy(final TransactionContextFactory parent) {
- super(parent.getActorContext());
-
- transactionChainId = new TransactionChainIdentifier(parent.getActorContext().getCurrentMemberName(), CHAIN_COUNTER.incrementAndGet());
+ TransactionChainProxy(final TransactionContextFactory parent, final LocalHistoryIdentifier historyId) {
+ super(parent.getActorContext(), historyId);
this.parent = parent;
}
- public String getTransactionChainId() {
- return transactionChainId.toString();
- }
-
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
currentState.checkReady();
getActorContext().broadcast(new Function<Short, Object>() {
@Override
public Object apply(Short version) {
- return new CloseTransactionChain(transactionChainId.toString(), version).toSerializable();
+ return new CloseTransactionChain(getHistoryId(), version).toSerializable();
}
});
}
LOG.debug("Tx: {} - waiting for ready futures with pending Tx {}", txId, previousTransactionId);
} else {
previousTransactionId = "";
- LOG.debug("Waiting for ready futures on chain {}", getTransactionChainId());
+ LOG.debug("Waiting for ready futures on chain {}", getHistoryId());
}
previous = combineFutureWithPossiblePriorReadOnlyTxFutures(previous, txId);
promise.success(null);
}
}
-
- @Override
- protected TransactionIdentifier nextIdentifier() {
- return transactionChainId.newTransactionIdentifier();
- }
}