import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
+import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
private final ShardTransactionMessageRetrySupport messageRetrySupport;
- private final FrontendMetadata frontendMetadata = new FrontendMetadata();
- private final Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = new HashMap<>();
+ private final FrontendMetadata frontendMetadata;
+ private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
protected Shard(final AbstractBuilder<?, ?> builder) {
super(builder.getId().toString(), builder.getPeerAddresses(),
this.name = builder.getId().toString();
this.datastoreContext = builder.getDatastoreContext();
this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
+ this.frontendMetadata = new FrontendMetadata(name);
setPersistence(datastoreContext.isPersistent());
new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher");
if (builder.getDataTree() != null) {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
- treeChangeListenerPublisher, dataChangeListenerPublisher, name);
+ treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata);
} else {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(),
builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher,
- dataChangeListenerPublisher, name);
+ dataChangeListenerPublisher, name, frontendMetadata);
}
shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this);
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext,
- new Dispatchers(context().system().dispatchers()).getDispatcherPath(
- Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
+ new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Transaction),
+ self(), getContext(), shardMBean, builder.getId().getShardName());
snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
this.name);
} else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
store.processCohortRegistryCommand(getSender(),
(DataTreeCohortActorRegistry.CohortRegistryCommand) message);
+ } else if (message instanceof PersistAbortTransactionPayload) {
+ final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
+ persistPayload(txId, AbortTransactionPayload.create(txId), true);
} else {
super.handleNonRaftCommand(message);
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
- store.closeTransactionChain(id, () -> store.purgeTransactionChain(id, null));
+ store.closeTransactionChain(id, null);
+ store.purgeTransactionChain(id, null);
}
@SuppressWarnings("checkstyle:IllegalCatch")
persistenceId(), getId());
}
- store.closeAllTransactionChains();
+ store.purgeLeaderState();
}
if (hasLeader && !isIsolatedLeader()) {
protected void onLeaderChanged(final String oldLeader, final String newLeader) {
shardMBean.incrementLeadershipChangeCount();
- boolean hasLeader = hasLeader();
- if (hasLeader && !isLeader()) {
+ final boolean hasLeader = hasLeader();
+ if (!hasLeader) {
+ // No leader implies we are not the leader, lose frontend state if we have any. This also places
+ // an explicit guard so the map will not get modified accidentally.
+ if (!knownFrontends.isEmpty()) {
+ LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
+ knownFrontends = ImmutableMap.of();
+ }
+ return;
+ }
+
+ if (!isLeader()) {
// Another leader was elected. If we were the previous leader and had pending transactions, convert
// them to transaction messages and send to the new leader.
ActorSelection leader = getLeader();
commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership "
+ "change and the leader address isn't available.", this);
}
+ } else {
+ // We have become the leader, we need to reconstruct frontend state
+ knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
+ LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet());
}
- if (hasLeader && !isIsolatedLeader()) {
+ if (!isIsolatedLeader()) {
messageRetrySupport.retryMessages();
}
}