+
+ @Nullable
+ @Override
+ public <T> RequestContext<T> createRequestContext() {
+ return new AbstractRequestContext<T>(deviceInfo.reserveXidForDeviceMessage()) {
+ @Override
+ public void close() {
+ }
+ };
+
+ }
+
+ private ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
+ }
+
+ final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
+
+ if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) {
+ final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
+ .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())).build();
+
+ setRoleOutputFuture = this.salRoleService.setRole(setRoleInput);
+
+ final TimerTask timerTask = timeout -> {
+ if (!setRoleOutputFuture.isDone()) {
+ LOG.warn("New role {} was not propagated to device {} during {} sec", newRole, deviceInfo.getLOGValue(), SET_ROLE_TIMEOUT);
+ setRoleOutputFuture.cancel(true);
+ }
+ };
+
+ hashedWheelTimer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.SECONDS);
+ } else {
+ LOG.info("Device: {} with version: {} does not support role", deviceInfo.getLOGValue(), deviceInfo.getVersion());
+ return Futures.immediateFuture(null);
+ }
+
+ return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave() {
+ return sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
+ }
+
+ @Override
+ public void onStateAcquired(final ContextChainState state) {
+ hasState = true;
+ }
+
+ private class RpcResultFutureCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
+
+ private final MastershipChangeListener mastershipChangeListener;
+
+ RpcResultFutureCallback(final MastershipChangeListener mastershipChangeListener) {
+ this.mastershipChangeListener = mastershipChangeListener;
+ }
+
+ @Override
+ public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
+ this.mastershipChangeListener.onMasterRoleAcquired(
+ deviceInfo,
+ ContextChainMastershipState.MASTER_ON_DEVICE
+ );
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getLOGValue());
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ mastershipChangeListener.onNotAbleToStartMastershipMandatory(
+ deviceInfo,
+ "Was not able to set MASTER role on device");
+ }
+ }
+
+ private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
+ private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
+ private final MastershipChangeListener mastershipChangeListener;
+
+ DeviceFlowRegistryCallback(
+ ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill,
+ MastershipChangeListener mastershipChangeListener) {
+ this.deviceFlowRegistryFill = deviceFlowRegistryFill;
+ this.mastershipChangeListener = mastershipChangeListener;
+ }
+
+ @Override
+ public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
+ if (LOG.isDebugEnabled()) {
+ // Count all flows we read from datastore for debugging purposes.
+ // This number do not always represent how many flows were actually added
+ // to DeviceFlowRegistry, because of possible duplicates.
+ long flowCount = Optional.fromNullable(result).asSet().stream()
+ .flatMap(Collection::stream)
+ .filter(Objects::nonNull)
+ .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
+ .filter(Objects::nonNull)
+ .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
+ .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
+ .filter(Objects::nonNull)
+ .filter(table -> Objects.nonNull(table.getFlow()))
+ .flatMap(table -> table.getFlow().stream())
+ .filter(Objects::nonNull)
+ .count();
+
+ LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getLOGValue());
+ }
+ this.mastershipChangeListener.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_FLOW_REGISTRY_FILL);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (deviceFlowRegistryFill.isCancelled()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getLOGValue());
+ }
+ } else {
+ LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getLOGValue(), t);
+ }
+ mastershipChangeListener.onNotAbleToStartMastership(
+ deviceInfo,
+ "Was not able to fill flow registry on device",
+ false);
+ }
+ }
+