}
@GuardedBy("lock")
- abstract ClientActorBehavior<T> reconnectConnection(ClientActorBehavior<T> current);
+ abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current);
private long readTime() {
return context.ticker().read();
}
}
+ final ClientActorBehavior<T> reconnect(final ClientActorBehavior<T> current) {
+ lock.lock();
+ try {
+ return lockedReconnect(current);
+ } finally {
+ lock.unlock();
+ }
+ }
+
/**
* Schedule a timer to fire on the actor thread after a delay.
*
delay = lockedCheckTimeout(now);
if (delay == null) {
// We have timed out. There is no point in scheduling a timer
- return reconnectConnection(current);
+ return lockedReconnect(current);
}
if (delay.isPresent()) {
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
}
}
+ private AbstractClientConnection<T> getConnection(final ResponseEnvelope<?> response) {
+ // Always called from actor context: no locking required
+ return connections.get(extractCookie(response.getMessage().getTarget()));
+ }
+
@SuppressWarnings("unchecked")
@Override
final ClientActorBehavior<T> onReceiveCommand(final Object command) {
}
private void onResponse(final ResponseEnvelope<?> response) {
- final long cookie = extractCookie(response.getMessage().getTarget());
- final AbstractClientConnection<T> connection = connections.get(cookie);
+ final AbstractClientConnection<T> connection = getConnection(response);
if (connection != null) {
connection.receiveResponse(response);
} else {
poison(cause);
return null;
}
+ if (cause instanceof NotLeaderException) {
+ final AbstractClientConnection<T> conn = getConnection(command);
+ if (conn instanceof ReconnectingClientConnection) {
+ // Already reconnecting, do not churn the logs
+ return this;
+ } else if (conn != null) {
+ LOG.info("{}: connection {} indicated no leadership, reconnecting it", persistenceId(), conn, cause);
+ return conn.reconnect(this);
+ }
+ }
return onRequestFailure(command);
}