List<MonitorRequest> monitorRequests,
MonitorCallBack callback);
+ /**
+ * ovsdb <a href="http://tools.ietf.org/html/draft-pfaff-ovsdb-proto-04#section-4.1.5">monitor</a> operation.
+ * @param monitorRequests represents what needs to be monitored including a client specified monitor handle. This
+ * handle is used to later cancel ({@link #cancelMonitor(MonitorHandle)}) the monitor.
+ * @param callback receives the monitor response
+ * @param timeout time in seconds for monitor transaction timeout
+ */
+ <E extends TableSchema<E>> TableUpdates monitor(DatabaseSchema schema,
+ List<MonitorRequest> monitorRequests,
+ MonitorCallBack callback,
+ int timeout);
+
/**
* ovsdb <a href="http://tools.ietf.org/html/draft-pfaff-ovsdb-proto-04#section-4.1.5">monitor</a> operation.
* @param monitorRequests represents what needs to be monitored
MonitorHandle monitorHandle,
MonitorCallBack callback);
+ /**
+ * ovsdb <a href="http://tools.ietf.org/html/draft-pfaff-ovsdb-proto-04#section-4.1.5">monitor</a> operation.
+ * @param monitorRequests represents what needs to be monitored
+ * @param monitorHandle A client specified monitor handle. This handle is used to later cancel
+ * ({@link #cancelMonitor(MonitorHandle)}) the monitor.
+ * @param callback receives the monitor response
+ * @param timeout time in seconds for monitor transaction timeout
+ */
+ <E extends TableSchema<E>> TableUpdates monitor(DatabaseSchema schema,
+ List<MonitorRequest> monitorRequests,
+ MonitorHandle monitorHandle,
+ MonitorCallBack callback,
+ int timeout);
+
/**
* Cancels an existing monitor method.
* @param handler Handle identifying a specific monitor request that is being cancelled.
*/
void cancelMonitor(MonitorHandle handler);
+ /**
+ * Cancels an existing monitor method.
+ * @param handler Handle identifying a specific monitor request that is being cancelled.
+ * @param timeout time in seconds for monitor transaction timeout
+ * @throws java.lang.IllegalStateException if there is no outstanding monitor request for this handle
+ */
+ void cancelMonitor(MonitorHandle handler, int timeout);
+
/**
* ovsdb <a href="http://tools.ietf.org/html/draft-pfaff-ovsdb-proto-04#section-4.1.8">lock</a> operation.
* @param lockId a client specified id for the lock; this can be used for unlocking ({@link #unLock(String)})
boolean startOvsdbManagerWithSsl(int ovsdbListenPort,
SSLContext sslContext, String[] protocols, String[] cipherSuites);
+ /**
+ * Method to restart ovsdb server for passive connection with SSL and user
+ * specifies protocols and cipher suites.
+ */
+ boolean restartOvsdbManagerWithSsl(int ovsdbListenPort,
+ SSLContext sslContext,
+ String[] protocols,
+ String[] cipherSuites);
+
/**
* Method to register a Passive Connection Listener with the ConnectionService.
* @param listener Passive Connection listener interested in Passive OVSDB connection requests.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.opendaylight.ovsdb.lib.EchoServiceCallbackFilters;
import org.opendaylight.ovsdb.lib.LockAquisitionCallback;
import org.opendaylight.ovsdb.lib.LockStolenCallback;
private OvsdbConnectionInfo connectionInfo;
private Channel channel;
private boolean isConnectionPublished;
+ private static final int NO_TIMEOUT = -1;
private static final ThreadFactory THREAD_FACTORY_SSL =
new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-SSL-%d").build();
return FutureTransformUtils.transformTransactResponse(rpc.transact(builder), operations);
}
+ @Override
+ public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
+ List<MonitorRequest> monitorRequest,
+ final MonitorCallBack callback) {
+ return monitor(dbSchema, monitorRequest, callback, NO_TIMEOUT);
+ }
+
@Override
public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
List<MonitorRequest> monitorRequest,
- final MonitorCallBack callback) {
+ final MonitorCallBack callback,
+ int timeout) {
final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
MonitorRequest::getTableName);
() -> Arrays.asList(dbSchema.getName(), monitorHandle.getId(), reqMap));
JsonNode result;
try {
- result = monitor.get();
- } catch (InterruptedException | ExecutionException e) {
+ if (timeout == NO_TIMEOUT) {
+ result = monitor.get();
+ } else {
+ result = monitor.get(timeout, TimeUnit.SECONDS);
+ }
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.warn("Failed to monitor {}", dbSchema, e);
return null;
}
List<MonitorRequest> monitorRequest,
final MonitorHandle monitorHandle,
final MonitorCallBack callback) {
+ return monitor(dbSchema, monitorRequest, monitorHandle, callback, NO_TIMEOUT);
+ }
+
+ @Override
+ public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
+ List<MonitorRequest> monitorRequest,
+ final MonitorHandle monitorHandle,
+ final MonitorCallBack callback,
+ int timeout) {
final ImmutableMap<String, MonitorRequest> reqMap = Maps.uniqueIndex(monitorRequest,
MonitorRequest::getTableName);
() -> Arrays.asList(dbSchema.getName(), monitorHandle.getId(), reqMap));
JsonNode result;
try {
- result = monitor.get();
- } catch (InterruptedException | ExecutionException e) {
+ if (timeout == NO_TIMEOUT) {
+ result = monitor.get();
+ } else {
+ result = monitor.get(timeout, TimeUnit.SECONDS);
+ }
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.warn("Failed to monitor {}", dbSchema, e);
return null;
}
@Override
public void cancelMonitor(final MonitorHandle handler) {
+ cancelMonitor(handler, NO_TIMEOUT);
+ }
+
+ @Override
+ public void cancelMonitor(final MonitorHandle handler, int timeout) {
ListenableFuture<JsonNode> cancelMonitor = rpc.monitor_cancel(() -> Collections.singletonList(handler.getId()));
JsonNode result = null;
try {
- result = cancelMonitor.get();
- } catch (InterruptedException | ExecutionException e) {
+ if (timeout == NO_TIMEOUT) {
+ result = cancelMonitor.get();
+ } else {
+ result = cancelMonitor.get(timeout, TimeUnit.SECONDS);
+ }
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("Exception when canceling monitor handler {}", handler.getId(), e);
}
*/
public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionService.class);
-
private static ThreadFactory passiveConnectionThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("OVSDBPassiveConnServ-%d").build();
private static ScheduledExecutorService executorService
private static final StalePassiveConnectionService STALE_PASSIVE_CONNECTION_SERVICE =
new StalePassiveConnectionService(executorService);
+ private static Channel serverChannel = null;
private static int retryPeriod = 100; // retry after 100 milliseconds
}
}
+ @Override
+ public synchronized boolean restartOvsdbManagerWithSsl(final int ovsdbListenPort,
+ final SSLContext sslContext,
+ final String[] protocols,
+ final String[] cipherSuites) {
+ if (singletonCreated.getAndSet(false) && (serverChannel != null)) {
+ serverChannel.close();
+ LOG.info("Server channel closed");
+ }
+ serverChannel = null;
+ return startOvsdbManagerWithSsl(ovsdbListenPort, sslContext, protocols, cipherSuites);
+ }
+
/**
* OVSDB Passive listening thread that uses Netty ServerBootstrap to open
* passive connection handle channel callbacks.
// Start the server.
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
Channel serverListenChannel = channelFuture.channel();
+ serverChannel = serverListenChannel;
// Wait until the server socket is closed.
serverListenChannel.closeFuture().sync();
} catch (InterruptedException e) {
this.retryTimes = 3;
}
+ private void retry() {
+ if (retryTimes > 0) {
+ executorService.schedule(this, retryPeriod, TimeUnit.MILLISECONDS);
+ } else {
+ LOG.debug("channel closed {}", channel);
+ channel.disconnect();
+ }
+ retryTimes--;
+ }
+
@Override
public void run() {
HandshakeStatus status = sslHandler.engine().getHandshakeStatus();
.equals("SSL_NULL_WITH_NULL_NULL")) {
// Not begin handshake yet. Retry later.
LOG.debug("handshake not begin yet {}", status);
- executorService.schedule(this, retryPeriod, TimeUnit.MILLISECONDS);
+ retry();
} else {
//Check if peer is trusted before notifying listeners
try {
} catch (SSLPeerUnverifiedException e) {
//Trust manager is still checking peer certificate. Retry later
LOG.debug("Peer certifiacte is not verified yet {}", status);
- executorService.schedule(this, retryPeriod, TimeUnit.MILLISECONDS);
+ retry();
}
}
break;
case NEED_TASK:
//Handshake still ongoing. Retry later.
LOG.debug("handshake not done yet {}", status);
- executorService.schedule(this, retryPeriod, TimeUnit.MILLISECONDS);
+ retry();
break;
case NEED_WRAP:
.equals("SSL_NULL_WITH_NULL_NULL")) {
/* peer not authenticated. No need to notify listener in this case. */
LOG.error("Ssl handshake fail. channel {}", channel);
+ channel.disconnect();
} else {
/*
* peer is authenticated. Give some time to wait for completion.
* since client will reconnect later.
*/
LOG.debug("handshake not done yet {}", status);
- if (retryTimes > 0) {
- executorService.schedule(this, retryPeriod, TimeUnit.MILLISECONDS);
- } else {
- LOG.debug("channel closed {}", channel);
- }
- retryTimes--;
+ retry();
}
break;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
ObjectMapper objectMapper;
Channel nettyChannel;
- Map<String, CallContext> methodContext = new HashMap<>();
+ Map<String, CallContext> methodContext = new ConcurrentHashMap<>();
Map<Object, OvsdbRPC.Callback> requestCallbacks = new HashMap<>();
public JsonRpcEndpoint(ObjectMapper objectMapper, Channel channel) {
LESS_THAN_OR_EQUALS("<="),
EQUALS("=="),
NOT_EQUALS("!="),
- GREATER_THAN(">="),
+ GREATER_THAN(">"),
GREATER_THAN_OR_EQUALS(">="),
INCLUDES("includes"),
EXCLUDES("excludes");