import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
*
* @param message
*/
- public void broadcast(final Object message){
+ public void broadcast(final Function<Short, Object> messageSupplier){
for(final String shardName : configuration.getAllShardNames()){
Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
@Override
public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
+ Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
if(failure != null) {
LOG.warn("broadcast failed to send message {} to shard {}: {}",
message.getClass().getSimpleName(), shardName, failure);