context.executeInActor(current -> {
final double time = beenOpen * 1.0 / 1_000_000_000;
entry.complete(entry.getRequest().toRequestFailure(
- new RequestTimeoutException("Timed out after " + time + "seconds")));
+ new RequestTimeoutException("Timed out after " + time + " seconds")));
return current;
});
}
import akka.actor.ActorRef;
import java.util.concurrent.CompletionStage;
+import java.util.function.Consumer;
import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.concepts.Registration;
/**
* Caching resolver which resolves a cookie to a leader {@link ActorRef}. This class needs to be specialized by the
*
* @author Robert Varga
*/
-public abstract class BackendInfoResolver<T extends BackendInfo> {
+public abstract class BackendInfoResolver<T extends BackendInfo> implements AutoCloseable {
/**
* Request resolution of a particular backend identified by a cookie. This request can be satisfied from the cache.
*
*/
@Nonnull
public abstract CompletionStage<? extends T> refreshBackendInfo(@Nonnull Long cookie, @Nonnull T staleInfo);
+
+ /**
+ * Registers a callback to be notified when BackendInfo that may have been previously obtained is now stale and
+ * should be refreshed.
+ *
+ * @param callback the callback that takes the backend cookie whose BackendInfo is now stale.
+ * @return a Registration
+ */
+ @Nonnull
+ public abstract Registration notifyWhenBackendInfoIsStale(Consumer<Long> callback);
+
+ @Override
+ public void close() {
+ }
}
import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.concepts.Registration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
private final InversibleLock connectionsLock = new InversibleLock();
private final BackendInfoResolver<T> resolver;
private final MessageAssembler responseMessageAssembler;
+ private final Registration staleBackendInfoReg;
protected ClientActorBehavior(@Nonnull final ClientActorContext context,
@Nonnull final BackendInfoResolver<T> resolver) {
.fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(),
config.getTempFileDirectory()))
.assembledMessageCallback((message, sender) -> context.self().tell(message, sender)).build();
+
+ staleBackendInfoReg = resolver.notifyWhenBackendInfoIsStale(shard -> {
+ context().executeInActor(behavior -> {
+ LOG.debug("BackendInfo for shard {} is now stale", shard);
+ final AbstractClientConnection<T> conn = connections.get(shard);
+ if (conn instanceof ConnectedClientConnection) {
+ conn.reconnect(this, new BackendStaleException(shard));
+ }
+ return behavior;
+ });
+ });
}
@Override
@Override
public void close() {
+ super.close();
responseMessageAssembler.close();
+ staleBackendInfoReg.close();
}
/**
return behavior;
}));
}
+
+ private static class BackendStaleException extends RequestException {
+ private static final long serialVersionUID = 1L;
+
+ BackendStaleException(final Long shard) {
+ super("Backend for shard " + shard + " is stale");
+ }
+
+ @Override
+ public boolean isRetriable() {
+ return false;
+ }
+ }
}
}
@Override
- public final void close() {
+ public void close() {
+ super.close();
context().executeInActor(this::shutdown);
}
import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedLong;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.concepts.Registration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
private final AtomicLong nextSessionId = new AtomicLong();
private final Function1<ActorRef, ?> connectFunction;
private final ActorContext actorContext;
+ private final Set<Consumer<Long>> staleBackendInfoCallbacks = ConcurrentHashMap.newKeySet();
// FIXME: we really need just ActorContext.findPrimaryShardAsync()
AbstractShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
ABIVersion.current()));
}
+ @Override
+ public Registration notifyWhenBackendInfoIsStale(final Consumer<Long> callback) {
+ staleBackendInfoCallbacks.add(callback);
+ return () -> staleBackendInfoCallbacks.remove(callback);
+ }
+
+ protected void notifyStaleBackendInfoCallbacks(Long cookie) {
+ staleBackendInfoCallbacks.forEach(callback -> callback.accept(cookie));
+ }
+
+ protected ActorContext actorContext() {
+ return actorContext;
+ }
+
protected final void flushCache(final String shardName) {
actorContext.getPrimaryShardInfoCache().remove(shardName);
}
Long resolveShardForPath(final YangInstanceIdentifier path) {
return pathToShard.apply(path);
}
+
+ @Override
+ public void close() {
+ super.close();
+ resolver().close();
+ }
}
*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
-import com.google.common.base.Preconditions;
+import static akka.pattern.Patterns.ask;
+
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.OnComplete;
+import akka.util.Timeout;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableBiMap.Builder;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.datastore.shardmanager.RegisterForShardAvailabilityChanges;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
/**
* {@link BackendInfoResolver} implementation for static shard configuration based on ShardManager. Each string-named
private static final Logger LOG = LoggerFactory.getLogger(ModuleShardBackendResolver.class);
private final ConcurrentMap<Long, ShardState> backends = new ConcurrentHashMap<>();
- private final ActorContext actorContext;
+
+ private final Future<Registration> shardAvailabilityChangesRegFuture;
@GuardedBy("this")
private long nextShard = 1;
// FIXME: we really need just ActorContext.findPrimaryShardAsync()
ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
super(clientId, actorContext);
- this.actorContext = Preconditions.checkNotNull(actorContext);
+
+ shardAvailabilityChangesRegFuture = ask(actorContext.getShardManager(), new RegisterForShardAvailabilityChanges(
+ this::onShardAvailabilityChange), Timeout.apply(60, TimeUnit.MINUTES))
+ .map(reply -> (Registration)reply, ExecutionContexts.global());
+
+ shardAvailabilityChangesRegFuture.onComplete(new OnComplete<Registration>() {
+ @Override
+ public void onComplete(Throwable failure, Registration reply) {
+ if (failure != null) {
+ LOG.error("RegisterForShardAvailabilityChanges failed", failure);
+ }
+ }
+ }, ExecutionContexts.global());
+ }
+
+ private void onShardAvailabilityChange(String shardName) {
+ LOG.debug("onShardAvailabilityChange for {}", shardName);
+
+ Long cookie = shards.get(shardName);
+ if (cookie == null) {
+ LOG.debug("No shard cookie found for {}", shardName);
+ return;
+ }
+
+ notifyStaleBackendInfoCallbacks(cookie);
}
Long resolveShardForPath(final YangInstanceIdentifier path) {
- final String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path);
+ final String shardName = actorContext().getShardStrategyFactory().getStrategy(path).findShard(path);
Long cookie = shards.get(shardName);
if (cookie == null) {
synchronized (this) {
return cookie;
}
-
@Override
public CompletionStage<ShardBackendInfo> getBackendInfo(final Long cookie) {
/*
return getBackendInfo(cookie);
}
+
+ @Override
+ public void close() {
+ shardAvailabilityChangesRegFuture.onComplete(new OnComplete<Registration>() {
+ @Override
+ public void onComplete(Throwable failure, Registration reply) {
+ reply.close();
+ }
+ }, ExecutionContexts.global());
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2018 Red Hat, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.shardmanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.function.Consumer;
+
+/**
+ * Local ShardManager message to register a callback to be notified of shard availability changes. The reply to
+ * this message is a {@link org.opendaylight.yangtools.concepts.Registration} instance wrapped in a
+ * {@link akka.actor.Status.Success}.
+ *
+ * @author Thomas Pantelis
+ */
+public class RegisterForShardAvailabilityChanges {
+ private final Consumer<String> callback;
+
+ public RegisterForShardAvailabilityChanges(Consumer<String> callback) {
+ this.callback = requireNonNull(callback);
+ }
+
+ public Consumer<String> getCallback() {
+ return callback;
+ }
+}
void setSchemaContext(final SchemaContext schemaContext) {
schemaContextProvider.set(Preconditions.checkNotNull(schemaContext));
}
+
+ @Override
+ public String toString() {
+ return "ShardInformation [shardId=" + shardId + ", leaderAvailable=" + leaderAvailable + ", actorInitialized="
+ + actorInitialized + ", followerSyncStatus=" + followerSyncStatus + ", role=" + role + ", leaderId="
+ + leaderId + ", activeMember=" + activeMember + "]";
+ }
+
+
}
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
private final Map<String, CompositeOnComplete<Boolean>> shardActorsStopping = new HashMap<>();
+ private final Set<Consumer<String>> shardAvailabilityCallbacks = new HashSet<>();
+
private final String persistenceId;
private final AbstractDataStore dataStore;
onGetShardRole((GetShardRole) message);
} else if (message instanceof RunnableMessage) {
((RunnableMessage)message).run();
+ } else if (message instanceof RegisterForShardAvailabilityChanges) {
+ onRegisterForShardAvailabilityChanges((RegisterForShardAvailabilityChanges)message);
} else if (message instanceof DeleteSnapshotsFailure) {
LOG.warn("{}: Failed to delete prior snapshots", persistenceId(),
((DeleteSnapshotsFailure) message).cause());
}
}
+ private void onRegisterForShardAvailabilityChanges(RegisterForShardAvailabilityChanges message) {
+ LOG.debug("{}: onRegisterForShardAvailabilityChanges: {}", persistenceId(), message);
+
+ final Consumer<String> callback = message.getCallback();
+ shardAvailabilityCallbacks.add(callback);
+
+ getSender().tell(new Status.Success((Registration)
+ () -> executeInSelf(() -> shardAvailabilityCallbacks.remove(callback))), self());
+ }
+
private void onGetShardRole(final GetShardRole message) {
LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName());
shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
primaryShardInfoCache.remove(shardInformation.getShardName());
+
+ notifyShardAvailabilityCallbacks(shardInformation);
}
checkReady();
}
}
+ private void notifyShardAvailabilityCallbacks(ShardInformation shardInformation) {
+ shardAvailabilityCallbacks.forEach(callback -> callback.accept(shardInformation.getShardName()));
+ }
+
private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) {
ShardInformation shardInfo = message.getShardInfo();
}
LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
- shardInformation.getShardName());
+ shardInformation);
Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
timeout, getSelf(),
info.setLeaderAvailable(false);
primaryShardInfoCache.remove(info.getShardName());
+
+ notifyShardAvailabilityCallbacks(info);
}
info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
package org.opendaylight.controller.cluster.databroker.actors.dds;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
+import akka.actor.Status;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Collections;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.shardmanager.RegisterForShardAvailabilityChanges;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import scala.concurrent.Promise;
private ActorSystem system;
private ModuleShardBackendResolver moduleShardBackendResolver;
private TestProbe contextProbe;
+ private TestProbe shardManagerProbe;
@Mock
private ShardStrategyFactory shardStrategyFactory;
MockitoAnnotations.initMocks(this);
system = ActorSystem.apply();
contextProbe = new TestProbe(system, "context");
+
+ shardManagerProbe = new TestProbe(system, "ShardManager");
+
final ActorContext actorContext = createActorContextMock(system, contextProbe.ref());
+ when(actorContext.getShardManager()).thenReturn(shardManagerProbe.ref());
+
moduleShardBackendResolver = new ModuleShardBackendResolver(CLIENT_ID, actorContext);
when(actorContext.getShardStrategyFactory()).thenReturn(shardStrategyFactory);
when(shardStrategyFactory.getStrategy(YangInstanceIdentifier.EMPTY)).thenReturn(shardStrategy);
@Test
public void testResolveShardForPathNonNullCookie() {
- when(shardStrategy.findShard(YangInstanceIdentifier.EMPTY)).thenReturn("default");
+ when(shardStrategy.findShard(YangInstanceIdentifier.EMPTY)).thenReturn(DefaultShardStrategy.DEFAULT_SHARD);
final Long cookie = moduleShardBackendResolver.resolveShardForPath(YangInstanceIdentifier.EMPTY);
Assert.assertEquals(0L, cookie.longValue());
}
final ShardBackendInfo shardBackendInfo = TestUtils.getWithTimeout(stage.toCompletableFuture());
Assert.assertEquals(0L, shardBackendInfo.getCookie().longValue());
Assert.assertEquals(dataTree, shardBackendInfo.getDataTree().get());
- Assert.assertEquals("default", shardBackendInfo.getShardName());
+ Assert.assertEquals(DefaultShardStrategy.DEFAULT_SHARD, shardBackendInfo.getShardName());
}
@Test
Assert.assertEquals(refreshedBackendProbe.ref(), refreshedBackendInfo.getActor());
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testNotifyWhenBackendInfoIsStale() {
+ final RegisterForShardAvailabilityChanges regMessage =
+ shardManagerProbe.expectMsgClass(RegisterForShardAvailabilityChanges.class);
+ Registration mockReg = mock(Registration.class);
+ shardManagerProbe.reply(new Status.Success(mockReg));
+
+ Consumer<Long> mockCallback = mock(Consumer.class);
+ final Registration callbackReg = moduleShardBackendResolver.notifyWhenBackendInfoIsStale(mockCallback);
+
+ regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
+ verify(mockCallback, timeout(5000)).accept(Long.valueOf(0));
+
+ reset(mockCallback);
+ callbackReg.close();
+
+ regMessage.getCallback().accept(DefaultShardStrategy.DEFAULT_SHARD);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyNoMoreInteractions(mockCallback);
+ }
+
private static ActorContext createActorContextMock(final ActorSystem system, final ActorRef actor) {
final ActorContext mock = mock(ActorContext.class);
final Promise<PrimaryShardInfo> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
final ActorSelection selection = system.actorSelection(actor.path());
final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
promise.success(shardInfo);
- when(mock.findPrimaryShardAsync("default")).thenReturn(promise.future());
+ when(mock.findPrimaryShardAsync(DefaultShardStrategy.DEFAULT_SHARD)).thenReturn(promise.future());
+ when(mock.getClientDispatcher()).thenReturn(system.dispatchers().defaultGlobalDispatcher());
return mock;
}
}
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.junit.Test;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
public class ShardManagerTest extends AbstractShardManagerTest {
};
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testRegisterForShardLeaderChanges() {
+ LOG.info("testRegisterForShardLeaderChanges starting");
+
+ final String memberId1 = "member-1-shard-default-" + shardMrgIDSuffix;
+ final String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
+ final TestKit kit = new TestKit(getSystem());
+ final ActorRef shardManager = actorFactory.createActor(newPropsShardMgrWithMockShardActor());
+
+ shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), kit.getRef());
+ shardManager.tell(new ActorInitialized(), mockShardActor);
+
+ final Consumer<String> mockCallback = mock(Consumer.class);
+ shardManager.tell(new RegisterForShardAvailabilityChanges(mockCallback), kit.getRef());
+
+ final Success reply = kit.expectMsgClass(Duration.apply(5, TimeUnit.SECONDS), Success.class);
+ final Registration reg = (Registration) reply.status();
+
+ final DataTree mockDataTree = mock(DataTree.class);
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ verify(mockCallback, timeout(5000)).accept("default");
+
+ reset(mockCallback);
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyNoMoreInteractions(mockCallback);
+
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, null, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ verify(mockCallback, timeout(5000)).accept("default");
+
+ reset(mockCallback);
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId2, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ verify(mockCallback, timeout(5000)).accept("default");
+
+ reset(mockCallback);
+ reg.close();
+
+ shardManager.tell(new ShardLeaderStateChanged(memberId1, memberId1, mockDataTree,
+ DataStoreVersions.CURRENT_VERSION), mockShardActor);
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyNoMoreInteractions(mockCallback);
+
+ LOG.info("testRegisterForShardLeaderChanges ending");
+ }
+
public static class TestShardManager extends ShardManager {
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final CountDownLatch snapshotPersist = new CountDownLatch(1);