final class SinkSingletonService extends ChannelInitializer<SocketChannel> implements ClusterSingletonService {
private static final Logger LOG = LoggerFactory.getLogger(SinkSingletonService.class);
- private static final ServiceGroupIdentifier SGID =
- ServiceGroupIdentifier.create(SinkSingletonService.class.getName());
+ private static final ServiceGroupIdentifier SGID = new ServiceGroupIdentifier(SinkSingletonService.class.getName());
// TODO: allow different trees?
private static final DOMDataTreeIdentifier TREE = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
- YangInstanceIdentifier.empty());
+ YangInstanceIdentifier.of());
private static long CHANNEL_CLOSE_TIMEOUT_S = 10;
private static final ByteBuf TREE_REQUEST;
@Override
public synchronized void instantiateServiceInstance() {
LOG.info("Replication sink started with source {}", sourceAddress);
- this.bs = bootstrapSupport.newBootstrap();
+ bs = bootstrapSupport.newBootstrap();
doConnect();
}
}
private synchronized void channelClosed(final ChannelFuture completedFuture, final ScheduledExecutorService group) {
- if (futureChannel != null && futureChannel.channel() == completedFuture.channel()) {
- if (!closingInstance) {
- LOG.info("Channel {} lost connection to source {}, reconnecting in {}", completedFuture.channel(),
- sourceAddress, reconnectDelay.getSeconds());
- group.schedule(() -> {
- reconnect();
- }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
- }
+ if (futureChannel != null && futureChannel.channel() == completedFuture.channel() && !closingInstance) {
+ LOG.info("Channel {} lost connection to source {}, reconnecting in {}", completedFuture.channel(),
+ sourceAddress, reconnectDelay.getSeconds());
+ group.schedule(this::reconnect, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
}
}