Fix checkstyle API.
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / DeviceContextImpl.java
index 45061de13808c2d9183a0cecf6cc1f9726441610..c7141da52bc8e95d05d56b4ec0ffd811be38e4b9 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.openflowplugin.impl.device;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Verify;
 import com.google.common.util.concurrent.FutureCallback;
@@ -27,6 +26,7 @@ import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
@@ -35,12 +35,10 @@ import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
 import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
 import org.opendaylight.openflowplugin.api.ConnectionException;
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
-import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
@@ -52,7 +50,7 @@ import org.opendaylight.openflowplugin.api.openflow.device.Xid;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener;
 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
@@ -77,7 +75,6 @@ import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFac
 import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
 import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
 import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
-import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
 import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
 import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
 import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
@@ -137,6 +134,9 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     // Timeout in seconds after what we will give up on propagating role
     private static final int SET_ROLE_TIMEOUT = 10;
 
+    // Timeout in milliseconds after what we will give up on initializing device
+    private static final int DEVICE_INIT_TIMEOUT = 9000;
+
     private static final int LOW_WATERMARK = 1000;
     private static final int HIGH_WATERMARK = 2000;
     private final MultipartWriterProvider writerProvider;
@@ -168,12 +168,12 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     private boolean switchFeaturesMandatory;
     private DeviceInfo deviceInfo;
     private final ConvertorExecutor convertorExecutor;
-    private volatile CONTEXT_STATE state;
+    private volatile ContextState state;
     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
     private final DeviceManager myManager;
     private final DeviceInitializerProvider deviceInitializerProvider;
     private final boolean useSingleLayerSerialization;
-    private OutboundQueueProvider outboundQueueProvider;
+    private boolean hasState;
     private boolean isInitialTransactionSubmitted;
 
     DeviceContextImpl(
@@ -189,7 +189,6 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
             final DeviceInitializerProvider deviceInitializerProvider) {
 
         this.primaryConnectionContext = primaryConnectionContext;
-        this.outboundQueueProvider = (OutboundQueueProvider) primaryConnectionContext.getOutboundQueueProvider();
         this.deviceInfo = primaryConnectionContext.getDeviceInfo();
         this.hashedWheelTimer = hashedWheelTimer;
         this.deviceInitializerProvider = deviceInitializerProvider;
@@ -213,7 +212,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
         this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
         this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
-        this.state = CONTEXT_STATE.INITIALIZATION;
+        this.state = ContextState.INITIALIZATION;
         this.convertorExecutor = convertorExecutor;
         this.skipTableFeatures = skipTableFeatures;
         this.useSingleLayerSerialization = useSingleLayerSerialization;
@@ -223,12 +222,8 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
 
     @Override
     public boolean initialSubmitTransaction() {
-        if (initialized) {
-            isInitialTransactionSubmitted = true;
-            return transactionChainManager.initialSubmitWriteTransaction();
-        }
-
-        return false;
+        return (initialized &&(isInitialTransactionSubmitted =
+                transactionChainManager.initialSubmitWriteTransaction()));
     }
 
     @Override
@@ -259,6 +254,11 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         return dataBroker.newReadOnlyTransaction();
     }
 
+    @Override
+    public boolean isTransactionsEnabled() {
+        return isInitialTransactionSubmitted;
+    }
+
     @Override
     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
                                                           final InstanceIdentifier<T> path,
@@ -319,8 +319,8 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         messageSpy.spyMessage(
                 ofHeader.getImplementedInterface(),
                 (ofHeader instanceof Error)
-                        ? MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE
-                        : MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
+                        ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
+                        : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
     }
 
     @Override
@@ -328,8 +328,8 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         ofHeaderList.forEach(header -> messageSpy.spyMessage(
                 header.getImplementedInterface(),
                 (header instanceof Error)
-                        ? MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE
-                        : MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS));
+                        ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
+                        : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
     }
 
     @Override
@@ -369,9 +369,9 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
 
     @Override
     public void processPortStatusMessage(final PortStatusMessage portStatus) {
-        messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
+        messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
 
-        if (isInitialTransactionSubmitted) {
+        if (initialized) {
             try {
                 writePortStatusMessage(portStatus);
                 submitTransaction();
@@ -379,6 +379,8 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
                 LOG.warn("Error processing port status message for port {} on device {}",
                         portStatus.getPortNo(), getDeviceInfo().getLOGValue(), e);
             }
+        } else if (!hasState) {
+            primaryConnectionContext.handlePortStatusMessage(portStatus);
         }
     }
 
@@ -408,29 +410,29 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
 
     @Override
     public void processPacketInMessage(final PacketInMessage packetInMessage) {
-        messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH);
+        messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH);
         final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
         final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
 
         if (packetReceived == null) {
             LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
-            messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+            messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
             return;
         } else {
-            messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
+            messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
         }
 
         if (!packetInLimiter.acquirePermit()) {
             LOG.debug("Packet limited");
             // TODO: save packet into emergency slot if possible
-            messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
+            messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
             return;
         }
 
         final ListenableFuture<?> offerNotification = notificationPublishService.offerNotification(packetReceived);
         if (NotificationPublishService.REJECTED.equals(offerNotification)) {
             LOG.debug("notification offer rejected");
-            messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
+            messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
             packetInLimiter.drainLowWaterMark();
             packetInLimiter.releasePermit();
             return;
@@ -439,13 +441,13 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         Futures.addCallback(offerNotification, new FutureCallback<Object>() {
             @Override
             public void onSuccess(final Object result) {
-                messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
+                messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
                 packetInLimiter.releasePermit();
             }
 
             @Override
             public void onFailure(final Throwable t) {
-                messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
+                messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
                 LOG.debug("notification offer failed: {}", t.getMessage());
                 LOG.trace("notification offer failed..", t);
                 packetInLimiter.releasePermit();
@@ -508,8 +510,8 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
 
     @Override
     public void onPublished() {
-        Verify.verify(CONTEXT_STATE.INITIALIZATION.equals(getState()));
-        this.state = CONTEXT_STATE.WORKING;
+        Verify.verify(ContextState.INITIALIZATION.equals(getState()));
+        this.state = ContextState.WORKING;
         primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
     }
 
@@ -543,7 +545,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         if (LOG.isDebugEnabled()) {
             LOG.debug("Shutdown method for node {}", getDeviceInfo().getLOGValue());
         }
-        if (CONTEXT_STATE.TERMINATION.equals(getState())) {
+        if (ContextState.TERMINATION.equals(getState())) {
             LOG.debug("DeviceCtx for Node {} is in termination process.", getDeviceInfo().getLOGValue());
             return;
         }
@@ -588,7 +590,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
     }
 
     @Override
-    public CONTEXT_STATE getState() {
+    public ContextState getState() {
         return this.state;
     }
 
@@ -614,13 +616,6 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         //NOOP
     }
 
-    @Override
-    public void putLifecycleServiceIntoTxChainManager(final LifecycleService lifecycleService){
-        if (initialized) {
-            this.transactionChainManager.setLifecycleService(lifecycleService);
-        }
-    }
-
     @Override
     public boolean canUseSingleLayerSerialization() {
         return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
@@ -646,19 +641,32 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
 
         LOG.info("Starting device context cluster services for node {}", deviceInfo.getLOGValue());
         lazyTransactionManagerInitialization();
-        this.transactionChainManager.activateTransactionManager();
+
+        try {
+            final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
+                    .retrieveAndClearPortStatusMessages();
+
+            portStatusMessages.forEach(this::writePortStatusMessage);
+            submitTransaction();
+        } catch (final Exception ex) {
+            LOG.warn("Error processing port status messages from device {}", getDeviceInfo().getLOGValue(), ex);
+            return false;
+        }
 
         try {
             final java.util.Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
                     .lookup(deviceInfo.getVersion());
 
             if (initializer.isPresent()) {
-                initializer.get().initialize(this, switchFeaturesMandatory, writerProvider, convertorExecutor);
+                initializer
+                        .get()
+                        .initialize(this, switchFeaturesMandatory, writerProvider, convertorExecutor)
+                        .get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
             } else {
                 throw new ExecutionException(new ConnectionException("Unsupported version " + deviceInfo.getVersion()));
             }
-        } catch (ExecutionException | InterruptedException e) {
-            LOG.warn("Device {} cannot be initialized: ", deviceInfo.getLOGValue(), e);
+        } catch (ExecutionException | InterruptedException | TimeoutException ex) {
+            LOG.warn("Device {} cannot be initialized: ", deviceInfo.getLOGValue(), ex);
             return false;
         }
 
@@ -682,6 +690,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
             this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo.getNodeInstanceIdentifier());
             this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
             this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
+            this.transactionChainManager.activateTransactionManager();
             this.initialized = true;
         }
     }
@@ -731,6 +740,11 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi
         return sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
     }
 
+    @Override
+    public void onStateAcquired(final ContextChainState state) {
+        hasState = true;
+    }
+
     private class RpcResultFutureCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
 
         private final MastershipChangeListener mastershipChangeListener;