import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
/**
private final Map<String, CompositeOnComplete<Boolean>> shardActorsStopping = new HashMap<>();
+ private final Set<Consumer<String>> shardAvailabilityCallbacks = new HashSet<>();
+
private final String persistenceId;
private final AbstractDataStore dataStore;
- private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
private PrefixedShardConfigUpdateHandler configUpdateHandler;
ShardManager(final AbstractShardManagerCreator<?> builder) {
LOG.info("Stopping ShardManager {}", persistenceId());
shardManagerMBean.unregisterMBean();
-
- if (configListenerReg != null) {
- configListenerReg.close();
- configListenerReg = null;
- }
}
@Override
onGetShardRole((GetShardRole) message);
} else if (message instanceof RunnableMessage) {
((RunnableMessage)message).run();
+ } else if (message instanceof RegisterForShardAvailabilityChanges) {
+ onRegisterForShardAvailabilityChanges((RegisterForShardAvailabilityChanges)message);
} else if (message instanceof DeleteSnapshotsFailure) {
LOG.warn("{}: Failed to delete prior snapshots", persistenceId(),
((DeleteSnapshotsFailure) message).cause());
}
}
+ private void onRegisterForShardAvailabilityChanges(final RegisterForShardAvailabilityChanges message) {
+ LOG.debug("{}: onRegisterForShardAvailabilityChanges: {}", persistenceId(), message);
+
+ final Consumer<String> callback = message.getCallback();
+ shardAvailabilityCallbacks.add(callback);
+
+ getSender().tell(new Status.Success((Registration)
+ () -> executeInSelf(() -> shardAvailabilityCallbacks.remove(callback))), self());
+ }
+
private void onGetShardRole(final GetShardRole message) {
LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName());
shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
primaryShardInfoCache.remove(shardInformation.getShardName());
+
+ notifyShardAvailabilityCallbacks(shardInformation);
}
checkReady();
}
}
+ private void notifyShardAvailabilityCallbacks(final ShardInformation shardInformation) {
+ shardAvailabilityCallbacks.forEach(callback -> callback.accept(shardInformation.getShardName()));
+ }
+
private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) {
ShardInformation shardInfo = message.getShardInfo();
message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
} else {
LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
- message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
+ message.getSender().tell(new NoShardLeaderException(shardInfo.getShardId()), getSelf());
}
}
}
LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
- shardInformation.getShardName());
+ shardInformation);
Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
timeout, getSelf(),
} else {
LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
shardInformation.getShardName());
- getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
+ getSender().tell(new NoShardLeaderException(shardInformation.getShardId()), getSelf());
}
return;
getSender().tell(messageSupplier.get(), getSelf());
}
- private static NoShardLeaderException createNoShardLeaderException(final ShardIdentifier shardId) {
- return new NoShardLeaderException(null, shardId.toString());
- }
-
private static NotInitializedException createNotInitializedException(final ShardIdentifier shardId) {
return new NotInitializedException(String.format(
"Found primary shard %s but it's not initialized yet. Please try again later", shardId));
info.setLeaderAvailable(false);
primaryShardInfoCache.remove(info.getShardName());
+
+ notifyShardAvailabilityCallbacks(info);
}
info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
@Override
public SupervisorStrategy supervisorStrategy() {
- return new OneForOneStrategy(10, Duration.create("1 minute"),
+ return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES),
(Function<Throwable, Directive>) t -> {
LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
return SupervisorStrategy.resume();
+ "Possible causes - there was a problem replicating the data or shard leadership changed "
+ "while replicating the shard data", leaderPath, shardId.getShardName()));
case NO_LEADER:
- return createNoShardLeaderException(shardId);
+ return new NoShardLeaderException(shardId);
case NOT_SUPPORTED:
return new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
serverChange.getSimpleName(), shardId.getShardName()));