import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
import com.google.common.base.Verify;
import java.util.Collection;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherType;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.concepts.WritableIdentifier;
import org.slf4j.Logger;
private final Map<Long, AbstractClientConnection<T>> connections = new ConcurrentHashMap<>();
private final InversibleLock connectionsLock = new InversibleLock();
private final BackendInfoResolver<T> resolver;
+ private final MessageAssembler responseMessageAssembler;
protected ClientActorBehavior(@Nonnull final ClientActorContext context,
@Nonnull final BackendInfoResolver<T> resolver) {
super(context);
this.resolver = Preconditions.checkNotNull(resolver);
+
+ final ClientActorConfig config = context.config();
+ responseMessageAssembler = MessageAssembler.builder().logContext(persistenceId())
+ .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(),
+ config.getTempFileDirectory()))
+ .assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build();
}
@Override
return context().getIdentifier();
}
+ @Override
+ public void close() {
+ responseMessageAssembler.close();
+ }
+
/**
* Get a connection to a shard.
*
if (command instanceof InternalCommand) {
return ((InternalCommand<T>) command).execute(this);
}
+
if (command instanceof SuccessEnvelope) {
return onRequestSuccess((SuccessEnvelope) command);
}
+
if (command instanceof FailureEnvelope) {
return internalOnRequestFailure((FailureEnvelope) command);
}
+ if (MessageAssembler.isHandledMessage(command)) {
+ context().dispatchers().getDispatcher(DispatcherType.Serialization).execute(
+ () -> responseMessageAssembler.handleMessage(command, context().self()));
+ return this;
+ }
+
return onCommand(command);
}
}
private ClientActorBehavior<T> internalOnRequestFailure(final FailureEnvelope command) {
+ final AbstractClientConnection<T> conn = getConnection(command);
+ if (conn != null) {
+ /*
+ * We are talking to multiple actors, which may be lagging behind our state significantly. This has
+ * the effect that we may be receiving responses from a previous connection after we have created a new
+ * one to a different actor.
+ *
+ * Since we are already replaying requests to the new actor, we want to ignore errors reported on the old
+ * connection -- for example NotLeaderException, which must not cause a new reconnect. Check the envelope's
+ * sessionId and if it does not match our current connection just ignore it.
+ */
+ final Optional<T> optBackend = conn.getBackendInfo();
+ if (optBackend.isPresent() && optBackend.get().getSessionId() != command.getSessionId()) {
+ LOG.debug("{}: Mismatched current connection {} and envelope {}, ignoring response", persistenceId(),
+ conn, command);
+ return this;
+ }
+ }
+
final RequestFailure<?, ?> failure = command.getMessage();
final RequestException cause = failure.getCause();
if (cause instanceof RetiredGenerationException) {
return null;
}
if (cause instanceof NotLeaderException) {
- final AbstractClientConnection<T> conn = getConnection(command);
if (conn instanceof ReconnectingClientConnection) {
// Already reconnecting, do not churn the logs
return this;
}
}
if (cause instanceof OutOfSequenceEnvelopeException) {
- final AbstractClientConnection<T> conn = getConnection(command);
if (conn instanceof ReconnectingClientConnection) {
// Already reconnecting, do not churn the logs
return this;
LOG.info("{}: resolved shard {} to {}", persistenceId(), shard, backend);
final long stamp = connectionsLock.writeLock();
try {
+ final Stopwatch sw = Stopwatch.createStarted();
+
// Create a new connected connection
final ConnectedClientConnection<T> newConn = new ConnectedClientConnection<>(conn.context(),
conn.cookie(), backend);
LOG.warn("{}: old connection {} does not match existing {}, new connection {} in limbo",
persistenceId(), conn, existing, newConn);
} else {
- LOG.info("{}: replaced connection {} with {}", persistenceId(), conn, newConn);
+ LOG.info("{}: replaced connection {} with {} in {}", persistenceId(), conn, newConn, sw);
}
} finally {
connectionsLock.unlockWrite(stamp);