@Override
public void close() throws Exception {
if (registration != null) {
+ LOG.info("Unregistering clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getLOGValue());
registration.close();
registration = null;
}
@Override
public void registerService(final ClusterSingletonServiceProvider singletonServiceProvider) {
+ LOG.info("Registering clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getLOGValue());
+
//lifecycle service -> device context -> statistics context -> rpc context -> role context -> lifecycle service
this.clusterInitializationPhaseHandler = deviceContext;
this.deviceContext.setLifecycleInitializationPhaseHandler(this.statContext);
}
private void fillDeviceFlowRegistry() {
- // Fill device flow registry with flows from datastore
- final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = deviceContext.getDeviceFlowRegistry().fill();
-
- // Start statistics scheduling only after we finished initializing device flow registry
- Futures.addCallback(deviceFlowRegistryFill, new FutureCallback<List<Optional<FlowCapableNode>>>() {
- @Override
- public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
- if (LOG.isDebugEnabled()) {
- // Count all flows we read from datastore for debugging purposes.
- // This number do not always represent how many flows were actually added
- // to DeviceFlowRegistry, because of possible duplicates.
- long flowCount = Optional.fromNullable(result).asSet().stream()
- .flatMap(Collection::stream)
- .filter(Objects::nonNull)
- .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
- .filter(Objects::nonNull)
- .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
- .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
- .filter(Objects::nonNull)
- .filter(table -> Objects.nonNull(table.getFlow()))
- .flatMap(table -> table.getFlow().stream())
- .filter(Objects::nonNull)
- .count();
-
- LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceContext.getDeviceInfo().getLOGValue());
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (deviceFlowRegistryFill.isCancelled()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceContext.getDeviceInfo().getLOGValue());
- }
- } else {
- LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceContext.getDeviceInfo().getLOGValue(), t);
- }
- }
- });
+
+ final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = deviceContext.getDeviceFlowRegistry().fill();
+ Futures.addCallback(deviceFlowRegistryFill, new DeviceFlowRegistryCallback(deviceFlowRegistryFill));
}
@Override
fillDeviceFlowRegistry();
return true;
}
+
+ private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
+ private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
+
+ public DeviceFlowRegistryCallback(ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill) {
+ this.deviceFlowRegistryFill = deviceFlowRegistryFill;
+ }
+
+ @Override
+ public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
+ if (LOG.isDebugEnabled()) {
+ // Count all flows we read from datastore for debugging purposes.
+ // This number do not always represent how many flows were actually added
+ // to DeviceFlowRegistry, because of possible duplicates.
+ long flowCount = Optional.fromNullable(result).asSet().stream()
+ .flatMap(Collection::stream)
+ .filter(Objects::nonNull)
+ .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
+ .filter(Objects::nonNull)
+ .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
+ .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
+ .filter(Objects::nonNull)
+ .filter(table -> Objects.nonNull(table.getFlow()))
+ .flatMap(table -> table.getFlow().stream())
+ .filter(Objects::nonNull)
+ .count();
+
+ LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceContext.getDeviceInfo().getLOGValue());
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (deviceFlowRegistryFill.isCancelled()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceContext.getDeviceInfo().getLOGValue());
+ }
+ } else {
+ LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceContext.getDeviceInfo().getLOGValue(), t);
+ }
+ }
+ }
}