*/
package org.opendaylight.openflowplugin.impl.lifecycle;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
-import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
-import javax.annotation.Nonnull;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
-import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.singleton.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.api.ServiceGroupIdentifier;
import org.opendaylight.openflowplugin.api.openflow.OFPContext;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainStateListener;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceInitializationContext;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.GuardedContext;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ReconciliationFrameworkStep;
+import org.opendaylight.yangtools.concepts.Registration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ContextChainImpl implements ContextChain {
private static final Logger LOG = LoggerFactory.getLogger(ContextChainImpl.class);
+ private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog");
private final AtomicBoolean masterStateOnDevice = new AtomicBoolean(false);
- private final AtomicBoolean initialGathering = new AtomicBoolean(false);
private final AtomicBoolean initialSubmitting = new AtomicBoolean(false);
- private final AtomicBoolean registryFilling = new AtomicBoolean(false);
private final AtomicBoolean rpcRegistration = new AtomicBoolean(false);
private final List<DeviceRemovedHandler> deviceRemovedHandlers = new CopyOnWriteArrayList<>();
private final List<GuardedContext> contexts = new CopyOnWriteArrayList<>();
private final List<ConnectionContext> auxiliaryConnections = new CopyOnWriteArrayList<>();
- private final ExecutorService executorService;
+ private final Executor executor;
private final ContextChainMastershipWatcher contextChainMastershipWatcher;
private final DeviceInfo deviceInfo;
private final ConnectionContext primaryConnection;
private final AtomicReference<ContextChainState> contextChainState =
new AtomicReference<>(ContextChainState.UNDEFINED);
- private AutoCloseable registration;
-
- ContextChainImpl(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher,
- @Nonnull final ConnectionContext connectionContext,
- @Nonnull final ExecutorService executorService) {
- this.contextChainMastershipWatcher = contextChainMastershipWatcher;
- this.primaryConnection = connectionContext;
- this.deviceInfo = connectionContext.getDeviceInfo();
- this.executorService = executorService;
+ private Registration registration;
+
+ ContextChainImpl(final @NonNull ContextChainMastershipWatcher contextChainMastershipWatcher,
+ final @NonNull ConnectionContext connectionContext, final @NonNull Executor executor) {
+ this.contextChainMastershipWatcher = requireNonNull(contextChainMastershipWatcher);
+ primaryConnection = requireNonNull(connectionContext);
+ this.executor = requireNonNull(executor);
+ deviceInfo = connectionContext.getDeviceInfo();
}
@Override
- public <T extends OFPContext> void addContext(@Nonnull final T context) {
+ public <T extends OFPContext> void addContext(@NonNull final T context) {
contexts.add(new GuardedContextImpl(context));
}
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
public void instantiateServiceInstance() {
+ OF_EVENT_LOG.debug("Clustering Service Invocation, Node: {}", deviceInfo);
try {
contexts.forEach(OFPContext::instantiateServiceInstance);
LOG.info("Started clustering services for node {}", deviceInfo);
} catch (final Exception ex) {
- LOG.warn("Not able to start clustering services for node {}", deviceInfo);
- executorService.submit(() -> contextChainMastershipWatcher
+ LOG.error("Not able to start clustering services for node {}", deviceInfo);
+ executor.execute(() -> contextChainMastershipWatcher
.onNotAbleToStartMastershipMandatory(deviceInfo, ex.toString()));
}
}
@Override
- public ListenableFuture<Void> closeServiceInstance() {
+ public ListenableFuture<?> closeServiceInstance() {
contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
- final ListenableFuture<List<Void>> servicesToBeClosed = Futures
- .allAsList(Lists.reverse(contexts)
- .stream()
- .map(OFPContext::closeServiceInstance)
- .collect(Collectors.toList()));
+ final ListenableFuture<?> servicesToBeClosed = Futures.allAsList(Lists.reverse(contexts).stream()
+ .map(OFPContext::closeServiceInstance)
+ .collect(Collectors.toList()));
- return Futures.transform(servicesToBeClosed, (input) -> {
+ return Futures.transform(servicesToBeClosed, input -> {
+ OF_EVENT_LOG.debug("Closing clustering Services, Node: {}", deviceInfo);
LOG.info("Closed clustering services for node {}", deviceInfo);
return null;
- }, executorService);
+ }, executor);
}
- @Nonnull
@Override
public ServiceGroupIdentifier getIdentifier() {
return deviceInfo.getServiceIdentifier();
auxiliaryConnections.clear();
// If we are still registered and we are not already closing, then close the registration
- if (Objects.nonNull(registration)) {
- try {
- registration.close();
- registration = null;
- LOG.info("Closed clustering services registration for node {}", deviceInfo);
- } catch (final Exception e) {
- LOG.warn("Failed to close clustering services registration for node {} with exception: ",
- deviceInfo, e);
- }
+ if (registration != null) {
+ registration.close();
+ registration = null;
+ LOG.info("Closed clustering services registration for node {}", deviceInfo);
+ OF_EVENT_LOG.debug("Closed clustering services registration for node {}", deviceInfo);
}
@Override
public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) {
- registration = Objects.requireNonNull(clusterSingletonServiceProvider
- .registerClusterSingletonService(this));
+ registration = clusterSingletonServiceProvider.registerClusterSingletonService(this);
LOG.debug("Registered clustering services for node {}", deviceInfo);
+ OF_EVENT_LOG.debug("Registered Clustering Services, Node: {}", deviceInfo);
}
@Override
- public boolean isMastered(@Nonnull ContextChainMastershipState mastershipState,
- boolean inReconciliationFrameworkStep) {
+ public boolean isMastered(final ContextChainMastershipState mastershipState,
+ final boolean inReconciliationFrameworkStep) {
switch (mastershipState) {
case INITIAL_SUBMIT:
LOG.debug("Device {}, initial submit OK.", deviceInfo);
- this.initialSubmitting.set(true);
+ OF_EVENT_LOG.debug("Device {}, initial submit OK.", deviceInfo);
+ initialSubmitting.set(true);
break;
case MASTER_ON_DEVICE:
LOG.debug("Device {}, master state OK.", deviceInfo);
- this.masterStateOnDevice.set(true);
- break;
- case INITIAL_GATHERING:
- LOG.debug("Device {}, initial gathering OK.", deviceInfo);
- this.initialGathering.set(true);
+ OF_EVENT_LOG.debug("Device {}, master state OK.", deviceInfo);
+ masterStateOnDevice.set(true);
break;
case RPC_REGISTRATION:
LOG.debug("Device {}, RPC registration OK.", deviceInfo);
- this.rpcRegistration.set(true);
- break;
- case INITIAL_FLOW_REGISTRY_FILL:
- // Flow registry fill is not mandatory to work as a master
- LOG.debug("Device {}, initial registry filling OK.", deviceInfo);
- this.registryFilling.set(true);
+ OF_EVENT_LOG.debug("Device {}, RPC registration OK.", deviceInfo);
+ rpcRegistration.set(true);
break;
case CHECK:
// no operation
break;
}
- final boolean result = initialGathering.get() &&
- masterStateOnDevice.get() &&
- rpcRegistration.get() &&
- inReconciliationFrameworkStep || initialSubmitting.get();
-
- if (!inReconciliationFrameworkStep &&
- result &&
- mastershipState != ContextChainMastershipState.CHECK) {
- LOG.info("Device {} is able to work as master{}",
- deviceInfo,
- registryFilling.get() ? "." : " WITHOUT flow registry !!!");
+ final boolean result = masterStateOnDevice.get() && rpcRegistration.get()
+ && inReconciliationFrameworkStep || initialSubmitting.get();
+
+ if (!inReconciliationFrameworkStep && result && mastershipState != ContextChainMastershipState.CHECK) {
+ LOG.info("Device {} is able to work as master", deviceInfo);
changeMastershipState(ContextChainState.WORKING_MASTER);
}
}
@Override
- public boolean addAuxiliaryConnection(@Nonnull ConnectionContext connectionContext) {
- return (connectionContext.getFeatures().getAuxiliaryId() != 0)
- && (!ConnectionContext.CONNECTION_STATE.RIP.equals(primaryConnection.getConnectionState()))
+ public void initializeDevice() {
+ contexts.forEach(context -> {
+ if (context.map(DeviceInitializationContext.class::isInstance)) {
+ context.map(DeviceInitializationContext.class::cast).initializeDevice();
+ }
+ });
+ }
+
+ @Override
+ public boolean addAuxiliaryConnection(@NonNull final ConnectionContext connectionContext) {
+ return connectionContext.getFeatures().getAuxiliaryId().toJava() != 0
+ && !ConnectionContext.CONNECTION_STATE.RIP.equals(primaryConnection.getConnectionState())
&& auxiliaryConnections.add(connectionContext);
}
@Override
- public boolean auxiliaryConnectionDropped(@Nonnull ConnectionContext connectionContext) {
+ public boolean auxiliaryConnectionDropped(@NonNull final ConnectionContext connectionContext) {
return auxiliaryConnections.remove(connectionContext);
}
@Override
- public void registerDeviceRemovedHandler(@Nonnull final DeviceRemovedHandler deviceRemovedHandler) {
+ public void registerDeviceRemovedHandler(@NonNull final DeviceRemovedHandler deviceRemovedHandler) {
deviceRemovedHandlers.add(deviceRemovedHandler);
}
- private void changeMastershipState(final ContextChainState contextChainState) {
- if (ContextChainState.CLOSED.equals(this.contextChainState.get())) {
+ private void changeMastershipState(final ContextChainState newContextChainState) {
+ if (ContextChainState.CLOSED.equals(contextChainState.get())) {
return;
}
- boolean propagate = ContextChainState.UNDEFINED.equals(this.contextChainState.get());
- this.contextChainState.set(contextChainState);
+ boolean propagate = ContextChainState.UNDEFINED.equals(contextChainState.get());
+ contextChainState.set(newContextChainState);
if (propagate) {
contexts.forEach(context -> {
if (context.map(ContextChainStateListener.class::isInstance)) {
- context.map(ContextChainStateListener.class::cast).onStateAcquired(contextChainState);
+ context.map(ContextChainStateListener.class::cast).onStateAcquired(newContextChainState);
}
});
}
}
private void unMasterMe() {
- registryFilling.set(false);
initialSubmitting.set(false);
- initialGathering.set(false);
masterStateOnDevice.set(false);
rpcRegistration.set(false);
}
-}
\ No newline at end of file
+}