import com.google.common.base.Strings;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
return future.transform(new Mapper<Object, PrimaryShardInfo>() {
@Override
- public PrimaryShardInfo checkedApply(Object response) throws Exception {
+ public PrimaryShardInfo checkedApply(Object response) throws UnknownMessageException {
if(response instanceof RemotePrimaryShardFound) {
LOG.debug("findPrimaryShardAsync received: {}", response);
RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
return clusterWrapper;
}
- public String getCurrentMemberName(){
+ public MemberName getCurrentMemberName(){
return clusterWrapper.getCurrentMemberName();
}
/**
* Send the message to each and every shard
- *
- * @param message
*/
- public void broadcast(final Function<Short, Object> messageSupplier){
+ public void broadcast(final Function<Short, Object> messageSupplier, Class<?> messageClass){
for(final String shardName : configuration.getAllShardNames()){
Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
@Override
public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
- Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
if(failure != null) {
LOG.warn("broadcast failed to send message {} to shard {}: {}",
- message.getClass().getSimpleName(), shardName, failure);
+ messageClass.getSimpleName(), shardName, failure);
} else {
+ Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
}
}