X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=56bdd7f8ea2cb6329dbdfe9cf79f38759f0375d0;hp=d3fb58e9d15e66e5c25ca40ce60c546cfb6d866e;hb=35235f427f3a056f85fe83ddd1133e67540328f7;hpb=634dfac8eead60f443bf75e749c70d1f2bb29198 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index d3fb58e9d1..56bdd7f8ea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -115,13 +115,13 @@ import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext; import scala.concurrent.Future; -import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; /** @@ -171,6 +171,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Map> shardActorsStopping = new HashMap<>(); + private final Set> shardAvailabilityCallbacks = new HashSet<>(); + private final String persistenceId; private final AbstractDataStore dataStore; @@ -301,6 +303,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onGetShardRole((GetShardRole) message); } else if (message instanceof RunnableMessage) { ((RunnableMessage)message).run(); + } else if (message instanceof RegisterForShardAvailabilityChanges) { + onRegisterForShardAvailabilityChanges((RegisterForShardAvailabilityChanges)message); } else if (message instanceof DeleteSnapshotsFailure) { LOG.warn("{}: Failed to delete prior snapshots", persistenceId(), ((DeleteSnapshotsFailure) message).cause()); @@ -315,6 +319,16 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onRegisterForShardAvailabilityChanges(RegisterForShardAvailabilityChanges message) { + LOG.debug("{}: onRegisterForShardAvailabilityChanges: {}", persistenceId(), message); + + final Consumer callback = message.getCallback(); + shardAvailabilityCallbacks.add(callback); + + getSender().tell(new Status.Success((Registration) + () -> executeInSelf(() -> shardAvailabilityCallbacks.remove(callback))), self()); + } + private void onGetShardRole(final GetShardRole message) { LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName()); @@ -763,6 +777,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion()); if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) { primaryShardInfoCache.remove(shardInformation.getShardName()); + + notifyShardAvailabilityCallbacks(shardInformation); } checkReady(); @@ -771,6 +787,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void notifyShardAvailabilityCallbacks(ShardInformation shardInformation) { + shardAvailabilityCallbacks.forEach(callback -> callback.accept(shardInformation.getShardName())); + } + private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) { ShardInformation shardInfo = message.getShardInfo(); @@ -927,7 +947,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(), - shardInformation.getShardName()); + shardInformation); Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce( timeout, getSelf(), @@ -1051,6 +1071,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { info.setLeaderAvailable(false); primaryShardInfoCache.remove(info.getShardName()); + + notifyShardAvailabilityCallbacks(info); } info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); @@ -1302,7 +1324,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public SupervisorStrategy supervisorStrategy() { - return new OneForOneStrategy(10, Duration.create("1 minute"), + return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES), (Function) t -> { LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t); return SupervisorStrategy.resume();