+
+ private void stopXtrThread() {
+ if (xtrThread != null) {
+ xtrThread.stopRunning();
+ while (xtrThread.isRunning()) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+
+ @Override
+ public Future<RpcResult<Void>> sendMapNotify(SendMapNotifyInput mapNotifyInput) {
+ LOG.trace("sendMapNotify called!!");
+ if (mapNotifyInput != null) {
+ ByteBuffer outBuffer = MapNotifySerializer.getInstance().serialize(mapNotifyInput.getMapNotify());
+ handleSerializedLispBuffer(mapNotifyInput.getTransportAddress(), outBuffer, MAP_NOTIFY);
+ } else {
+ LOG.warn("MapNotify was null");
+ return Futures.immediateFuture(RpcResultBuilder.<Void> failed().build());
+ }
+ return Futures.immediateFuture(RpcResultBuilder.<Void> success().build());
+ }
+
+ @Override
+ public Future<RpcResult<Void>> sendMapReply(SendMapReplyInput mapReplyInput) {
+ LOG.trace("sendMapReply called!!");
+ if (mapReplyInput != null) {
+ ByteBuffer outBuffer = MapReplySerializer.getInstance().serialize(mapReplyInput.getMapReply());
+ handleSerializedLispBuffer(mapReplyInput.getTransportAddress(), outBuffer, MAP_REPlY);
+ } else {
+ LOG.warn("MapReply was null");
+ return Futures.immediateFuture(RpcResultBuilder.<Void> failed().build());
+ }
+ return Futures.immediateFuture(RpcResultBuilder.<Void> success().build());
+ }
+
+ @Override
+ public Future<RpcResult<Void>> sendMapRequest(SendMapRequestInput mapRequestInput) {
+ LOG.trace("sendMapRequest called!!");
+ if (mapRequestInput != null) {
+ ByteBuffer outBuffer = MapRequestSerializer.getInstance().serialize(mapRequestInput.getMapRequest());
+ handleSerializedLispBuffer(mapRequestInput.getTransportAddress(), outBuffer, MAP_REQUEST);
+ } else {
+ LOG.debug("MapRequest was null");
+ return Futures.immediateFuture(RpcResultBuilder.<Void> failed().build());
+ }
+ return Futures.immediateFuture(RpcResultBuilder.<Void> success().build());
+ }
+
+ @Override
+ public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
+ listenOnXtrPort = shouldListenOnXtrPort;
+ if (listenOnXtrPort) {
+ LOG.debug("restarting xtr thread");
+ restartXtrThread();
+ } else {
+ LOG.debug("terminating thread");
+ stopXtrThread();
+ }
+ }
+
+ @Override
+ public void setXtrPort(int port) {
+ this.xtrPort = port;
+ if (listenOnXtrPort) {
+ restartXtrThread();
+ }
+ }