import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
import com.google.common.base.Verify;
import java.util.Collection;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherType;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.concepts.WritableIdentifier;
import org.slf4j.Logger;
private final Map<Long, AbstractClientConnection<T>> connections = new ConcurrentHashMap<>();
private final InversibleLock connectionsLock = new InversibleLock();
private final BackendInfoResolver<T> resolver;
+ private final MessageAssembler responseMessageAssembler;
protected ClientActorBehavior(@Nonnull final ClientActorContext context,
@Nonnull final BackendInfoResolver<T> resolver) {
super(context);
this.resolver = Preconditions.checkNotNull(resolver);
+
+ final ClientActorConfig config = context.config();
+ responseMessageAssembler = MessageAssembler.builder().logContext(persistenceId())
+ .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(),
+ config.getTempFileDirectory()))
+ .assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build();
}
@Override
return context().getIdentifier();
}
+ @Override
+ public void close() {
+ responseMessageAssembler.close();
+ }
+
/**
* Get a connection to a shard.
*
if (command instanceof InternalCommand) {
return ((InternalCommand<T>) command).execute(this);
}
+
if (command instanceof SuccessEnvelope) {
return onRequestSuccess((SuccessEnvelope) command);
}
+
if (command instanceof FailureEnvelope) {
return internalOnRequestFailure((FailureEnvelope) command);
}
+ if (MessageAssembler.isHandledMessage(command)) {
+ context().dispatchers().getDispatcher(DispatcherType.Serialization).execute(
+ () -> responseMessageAssembler.handleMessage(command, context().self()));
+ return this;
+ }
+
return onCommand(command);
}
}
private ClientActorBehavior<T> internalOnRequestFailure(final FailureEnvelope command) {
+ final AbstractClientConnection<T> conn = getConnection(command);
+ if (conn != null) {
+ /*
+ * We are talking to multiple actors, which may be lagging behind our state significantly. This has
+ * the effect that we may be receiving responses from a previous connection after we have created a new
+ * one to a different actor.
+ *
+ * Since we are already replaying requests to the new actor, we want to ignore errors reported on the old
+ * connection -- for example NotLeaderException, which must not cause a new reconnect. Check the envelope's
+ * sessionId and if it does not match our current connection just ignore it.
+ */
+ final Optional<T> optBackend = conn.getBackendInfo();
+ if (optBackend.isPresent() && optBackend.get().getSessionId() != command.getSessionId()) {
+ LOG.debug("{}: Mismatched current connection {} and envelope {}, ignoring response", persistenceId(),
+ conn, command);
+ return this;
+ }
+ }
+
final RequestFailure<?, ?> failure = command.getMessage();
final RequestException cause = failure.getCause();
if (cause instanceof RetiredGenerationException) {
return null;
}
if (cause instanceof NotLeaderException) {
- final AbstractClientConnection<T> conn = getConnection(command);
if (conn instanceof ReconnectingClientConnection) {
// Already reconnecting, do not churn the logs
return this;
} else if (conn != null) {
LOG.info("{}: connection {} indicated no leadership, reconnecting it", persistenceId(), conn, cause);
- return conn.reconnect(this);
+ return conn.reconnect(this, cause);
}
}
if (cause instanceof OutOfSequenceEnvelopeException) {
- final AbstractClientConnection<T> conn = getConnection(command);
if (conn instanceof ReconnectingClientConnection) {
// Already reconnecting, do not churn the logs
return this;
} else if (conn != null) {
LOG.info("{}: connection {} indicated no sequencing mismatch on {} sequence {}, reconnecting it",
persistenceId(), conn, failure.getTarget(), failure.getSequence(), cause);
- return conn.reconnect(this);
+ return conn.reconnect(this, cause);
}
}
LOG.info("{}: resolved shard {} to {}", persistenceId(), shard, backend);
final long stamp = connectionsLock.writeLock();
try {
+ final Stopwatch sw = Stopwatch.createStarted();
+
// Create a new connected connection
final ConnectedClientConnection<T> newConn = new ConnectedClientConnection<>(conn.context(),
conn.cookie(), backend);
conn.finishReplay(forwarder);
// Make sure new lookups pick up the new connection
- connections.replace(shard, conn, newConn);
- LOG.info("{}: replaced connection {} with {}", persistenceId(), conn, newConn);
+ if (!connections.replace(shard, conn, newConn)) {
+ final AbstractClientConnection<T> existing = connections.get(conn.cookie());
+ LOG.warn("{}: old connection {} does not match existing {}, new connection {} in limbo",
+ persistenceId(), conn, existing, newConn);
+ } else {
+ LOG.info("{}: replaced connection {} with {} in {}", persistenceId(), conn, newConn, sw);
+ }
} finally {
connectionsLock.unlockWrite(stamp);
}
}
void removeConnection(final AbstractClientConnection<?> conn) {
- connections.remove(conn.cookie(), conn);
- LOG.debug("{}: removed connection {}", persistenceId(), conn);
+ final long stamp = connectionsLock.writeLock();
+ try {
+ if (!connections.remove(conn.cookie(), conn)) {
+ final AbstractClientConnection<T> existing = connections.get(conn.cookie());
+ if (existing != null) {
+ LOG.warn("{}: failed to remove connection {}, as it was superseded by {}", persistenceId(), conn,
+ existing);
+ } else {
+ LOG.warn("{}: failed to remove connection {}, as it was not tracked", persistenceId(), conn);
+ }
+ } else {
+ LOG.info("{}: removed connection {}", persistenceId(), conn);
+ }
+ } finally {
+ connectionsLock.unlockWrite(stamp);
+ }
}
@SuppressWarnings("unchecked")
final ReconnectingClientConnection<T> conn = (ReconnectingClientConnection<T>)newConn;
LOG.info("{}: connection {} reconnecting as {}", persistenceId(), oldConn, newConn);
- final boolean replaced = connections.replace(oldConn.cookie(), (AbstractClientConnection<T>)oldConn, conn);
- if (!replaced) {
- final AbstractClientConnection<T> existing = connections.get(oldConn.cookie());
- LOG.warn("{}: old connection {} does not match existing {}, new connection {} in limbo", persistenceId(),
- oldConn, existing, newConn);
+ final long stamp = connectionsLock.writeLock();
+ try {
+ final boolean replaced = connections.replace(oldConn.cookie(), (AbstractClientConnection<T>)oldConn, conn);
+ if (!replaced) {
+ final AbstractClientConnection<T> existing = connections.get(oldConn.cookie());
+ if (existing != null) {
+ LOG.warn("{}: failed to replace connection {}, as it was superseded by {}", persistenceId(), conn,
+ existing);
+ } else {
+ LOG.warn("{}: failed to replace connection {}, as it was not tracked", persistenceId(), conn);
+ }
+ }
+ } finally {
+ connectionsLock.unlockWrite(stamp);
}
final Long shard = oldConn.cookie();