*/
package org.opendaylight.controller.cluster.datastore;
+import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.serialization.JavaSerializer;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DisableTrackingPayload;
-import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload;
import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.controller.cluster.messaging.MessageSlicer;
import org.opendaylight.controller.cluster.messaging.SliceOptions;
private final ShardTransactionMessageRetrySupport messageRetrySupport;
- private final FrontendMetadata frontendMetadata;
+ @VisibleForTesting
+ final FrontendMetadata frontendMetadata;
+
private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
private boolean paused;
}
@Override
- public void postStop() {
+ public void postStop() throws Exception {
LOG.info("Stopping Shard {}", persistenceId());
super.postStop();
} else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
store.processCohortRegistryCommand(getSender(),
(DataTreeCohortActorRegistry.CohortRegistryCommand) message);
- } else if (message instanceof PersistAbortTransactionPayload) {
- final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
- persistPayload(txId, AbortTransactionPayload.create(txId,
- datastoreContext.getInitialPayloadSerializedBufferCapacity()), true);
- persistPayload(txId, PurgeTransactionPayload.create(txId,
- datastoreContext.getInitialPayloadSerializedBufferCapacity()), false);
} else if (message instanceof MakeLeaderLocal) {
onMakeLeaderLocal();
} else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
requestMessageAssembler.checkExpiredAssembledMessageState();
}
- private Optional<Long> updateAccess(final SimpleShardDataTreeCohort cohort) {
+ private OptionalLong updateAccess(final SimpleShardDataTreeCohort cohort) {
final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId();
final LeaderFrontendState state = knownFrontends.get(frontend);
if (state == null) {
// Not tell-based protocol, do nothing
- return Optional.absent();
+ return OptionalLong.empty();
}
if (isIsolatedLeader()) {
// We are isolated and no new request can come through until we emerge from it. We are still updating
// liveness of frontend when we see it attempting to communicate. Use the last access timer.
- return Optional.of(state.getLastSeenTicks());
+ return OptionalLong.of(state.getLastSeenTicks());
}
// If this frontend has freshly connected, give it some time to catch up before killing its transactions.
- return Optional.of(state.getLastConnectTicks());
+ return OptionalLong.of(state.getLastConnectTicks());
}
private void disableTracking(final DisableTrackingPayload payload) {
}
} else {
LOG.debug("{}: leader state for {} not found", persistenceId(), clientId);
+ knownFrontends.put(frontendId, new LeaderFrontendState.Disabled(persistenceId(), clientId,
+ getDataStore()));
}
}
}
if (isLeader()) {
final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
askProtocolEncountered(id.getClientId());
-
- // FIXME: CONTROLLER-1628: stage purge once no transactions are present
- store.closeTransactionChain(id, null);
- store.purgeTransactionChain(id, null);
+ store.closeTransactionChain(id);
} else if (getLeader() != null) {
getLeader().forward(closeTransactionChain, getContext());
} else {
// Called on leader only
private void askProtocolEncountered(final ClientIdentifier clientId) {
- final LeaderFrontendState state = knownFrontends.get(clientId.getFrontendId());
- if (state instanceof LeaderFrontendState.Enabled) {
+ final FrontendIdentifier frontend = clientId.getFrontendId();
+ final LeaderFrontendState state = knownFrontends.get(frontend);
+ if (!(state instanceof LeaderFrontendState.Disabled)) {
LOG.debug("{}: encountered ask-based client {}, disabling transaction tracking", persistenceId(), clientId);
+ if (knownFrontends.isEmpty()) {
+ knownFrontends = new HashMap<>();
+ }
+ knownFrontends.put(frontend, new LeaderFrontendState.Disabled(persistenceId(), clientId, getDataStore()));
+
persistPayload(clientId, DisableTrackingPayload.create(clientId,
datastoreContext.getInitialPayloadSerializedBufferCapacity()), false);
}
paused = true;
// Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused.
- knownFrontends.values().forEach(LeaderFrontendState::retire);
- knownFrontends = ImmutableMap.of();
+ if (datastoreContext.isUseTellBasedProtocol()) {
+ knownFrontends.values().forEach(LeaderFrontendState::retire);
+ knownFrontends = ImmutableMap.of();
+ }
store.setRunOnPendingTransactionsComplete(operation);
}
}
public abstract static class AbstractBuilder<T extends AbstractBuilder<T, S>, S extends Shard> {
- private final Class<S> shardClass;
+ private final Class<? extends S> shardClass;
private ShardIdentifier id;
private Map<String, String> peerAddresses = Collections.emptyMap();
private DatastoreContext datastoreContext;
private SchemaContextProvider schemaContextProvider;
private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
private DataTree dataTree;
+
private volatile boolean sealed;
- protected AbstractBuilder(final Class<S> shardClass) {
+ protected AbstractBuilder(final Class<? extends S> shardClass) {
this.shardClass = shardClass;
}
protected void checkSealed() {
- Preconditions.checkState(!sealed, "Builder isalready sealed - further modifications are not allowed");
+ checkState(!sealed, "Builder isalready sealed - further modifications are not allowed");
}
@SuppressWarnings("unchecked")
public T schemaContextProvider(final SchemaContextProvider newSchemaContextProvider) {
checkSealed();
- this.schemaContextProvider = Preconditions.checkNotNull(newSchemaContextProvider);
+ this.schemaContextProvider = requireNonNull(newSchemaContextProvider);
return self();
}
}
protected void verify() {
- Preconditions.checkNotNull(id, "id should not be null");
- Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
- Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
- Preconditions.checkNotNull(schemaContextProvider, "schemaContextProvider should not be null");
+ requireNonNull(id, "id should not be null");
+ requireNonNull(peerAddresses, "peerAddresses should not be null");
+ requireNonNull(datastoreContext, "dataStoreContext should not be null");
+ requireNonNull(schemaContextProvider, "schemaContextProvider should not be null");
}
public Props props() {
public static class Builder extends AbstractBuilder<Builder, Shard> {
Builder() {
- super(Shard.class);
+ this(Shard.class);
+ }
+
+ Builder(final Class<? extends Shard> shardClass) {
+ super(shardClass);
}
}