Connector: bug: CatalogRequest call fails if number of entries in policy or contractdefinition or contract negotiation is more than edc.sql.fetch.size limit
Bug Report
Describe the Bug
CatalogRequest call fails with edcexception if number entries in policy or contractdefinition or contract negotiation tables are more than limit mentioned in edc.sql.fetch.size. This value will be set before executeQuery --> preparedStatement.setFetchSize(fetchSize) Issue occurs since ResultSet is closed before all (More than fetch limit) data retrieved. From postgres databse we get error “portal ‘c_n’ does not exist”.
Expected Behavior
Irrespective of fetchSize limit set all data has to be streamed and retrieved.
Observed Behavior
resultset, connection, statement gets closed before all data retrieved.
Steps to Reproduce
Steps to reproduce the behavior:
- set edc.sql.fetch.size value to low number …i.e example: 5
- Create more than 5 entries in asset, policy, contractdefinition on provider side.
- Make a Catalog request call from Consumer to provider. Exception occurs.
Context Information
- EDC v0.2.1
- OS: ios/Windows
- Database: postgres
Detailed Description
Logs:
{ "_index": "logs-json-kyma-2023.11.20", "_type": "_doc", "_id": "1i3f7IsBkyIsxaw_Ex0w", "_version": 1, "_score": null, "_source": { "date": 1700486117.340547, "log": "{\"level\":\"SEVERE\",\"msg\":\"JerseyExtension: Unexpected exception caught\",\"logger\":\"org.eclipse.edc.monitor.logger.LoggerMonitor\",\"sequenceNumber\":615,\"sourceClassName\":\"org.eclipse.edc.monitor.logger.LoggerMonitor\",\"sourceMethodName\":\"lambda$log$2\",\"stackTrace\":\"org.eclipse.edc.sql.SqlQueryExecutor$1.tryAdvance(SqlQueryExecutor.java:138) java.base/java.util.Spliterator.forEachRemaining(Unknown Source) java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(Unknown Source) java.base/java.util.stream.ReferencePipeline.toArray(Unknown Source) java.base/java.util.stream.ReferencePipeline.toArray(Unknown Source) java.base/java.util.stream.ReferencePipeline.toList(Unknown Source) org.eclipse.edc.connector.catalog.DatasetResolverImpl.query(DatasetResolverImpl.java:59) org.eclipse.edc.connector.service.catalog.CatalogProtocolServiceImpl.getCatalog(CatalogProtocolServiceImpl.java:55) org.eclipse.edc.protocol.dsp.catalog.api.controller.DspCatalogApiController.requestCatalog(DspCatalogApiController.java:108) jdk.internal.reflect.GeneratedMethodAccessor85.invoke(Unknown Source) java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) java.base/java.lang.reflect.Method.invoke(Unknown Source) org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52) org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:146) org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:189) org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$ResponseOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:176) org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:93) org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:478) org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:400) org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81) org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261) org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) org.glassfish.jersey.internal.Errors.process(Errors.java:292) org.glassfish.jersey.internal.Errors.process(Errors.java:274) org.glassfish.jersey.internal.Errors.process(Errors.java:244) org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265) org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:240) org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:697) org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394) org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:357) org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:311) org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:764) org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:529) org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:221) org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1381) org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:176) org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:484) org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:174) org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1303) org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:129) org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:192) org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:122) org.eclipse.jetty.server.Server.handle(Server.java:563) org.eclipse.jetty.server.HttpChannel$RequestDispatchable.dispatch(HttpChannel.java:1598) org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:753) org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:501) org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:287) org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:314) org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:100) org.eclipse.jetty.io.SelectableChannelEndPoint$1.run(SelectableChannelEndPoint.java:53) org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.runTask(AdaptiveExecutionStrategy.java:421) org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.consumeTask(AdaptiveExecutionStrategy.java:390) org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.tryProduce(AdaptiveExecutionStrategy.java:277) org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.run(AdaptiveExecutionStrategy.java:199) org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:411) org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:969) org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.doRunJob(QueuedThreadPool.java:1194) org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1149) java.base/java.lang.Thread.run(Unknown Source)\",\"resourceBundleName\":null,\"instant\":1700486117.340078476,\"threadId\":75}", "level": "SEVERE", "stream": "stderr", "_p": "F", "stackTrace": "org.eclipse.edc.sql.SqlQueryExecutor$1.tryAdvance(SqlQueryExecutor.java:138) java.base/java.util.Spliterator.forEachRemaining(Unknown Source) java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(Unknown Source) java.base/java.util.stream.ReferencePipeline.toArray(Unknown Source) java.base/java.util.stream.ReferencePipeline.toArray(Unknown Source) java.base/java.util.stream.ReferencePipeline.toList(Unknown Source) org.eclipse.edc.connector.catalog.DatasetResolverImpl.query(DatasetResolverImpl.java:59) org.eclipse.edc.connector.service.catalog.CatalogProtocolServiceImpl.getCatalog(CatalogProtocolServiceImpl.java:55) org.eclipse.edc.protocol.dsp.catalog.api.controller.DspCatalogApiController.requestCatalog(DspCatalogApiController.java:108) jdk.internal.reflect.GeneratedMethodAccessor85.invoke(Unknown Source) java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) java.base/java.lang.reflect.Method.invoke(Unknown Source) org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52) org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:146) org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:189) org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$ResponseOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:176) org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:93) org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:478) org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:400) org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81) org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261) org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) org.glassfish.jersey.internal.Errors.process(Errors.java:292) org.glassfish.jersey.internal.Errors.process(Errors.java:274) org.glassfish.jersey.internal.Errors.process(Errors.java:244) org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265) org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:240) org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:697) org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394) org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346) org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:357) org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:311) org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205) org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:764) org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:529) org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:221) org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1381) org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:176) org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:484) org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:174) org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1303) org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:129) org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:192) org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:122) org.eclipse.jetty.server.Server.handle(Server.java:563) org.eclipse.jetty.server.HttpChannel$RequestDispatchable.dispatch(HttpChannel.java:1598) org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:753) org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:501) org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:287) org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:314) org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:100) org.eclipse.jetty.io.SelectableChannelEndPoint$1.run(SelectableChannelEndPoint.java:53) org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.runTask(AdaptiveExecutionStrategy.java:421) org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.consumeTask(AdaptiveExecutionStrategy.java:390) org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.tryProduce(AdaptiveExecutionStrategy.java:277) org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.run(AdaptiveExecutionStrategy.java:199) org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:411) org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:969) org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.doRunJob(QueuedThreadPool.java:1194) org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1149) java.base/java.lang.Thread.run(Unknown Source)", "kubernetes": { "pod_id": "f3138357-7bf1-489f-880f-cb8c9cdc2284", "host": "ip-10-250-12-42.eu-central-1.compute.internal", "labels": { "app": "controlplane", "pod-template-hash": "f79746765", "app_kubernetes_io_name": "controlplane", "app_kubernetes_io_instance": "t-hotwheels-test" }, "namespace_name": "t-hotwheels-test", "container_name": "control-plane", "container_image": "3f2c5d578081-20231120-09384647-684.staging.repositories.cloud.sap/dataspace-integration/sap-dataspace-connector-controlplane:0.7.2-20231120092931-4fd0126", "pod_name": "control-plane-f79746765-7c7xk", "container_hash": "3f2c5d578081-20231120-09384647-684.staging.repositories.cloud.sap/dataspace-integration/sap-dataspace-connector-controlplane@sha256:b12de2f2bca71630a994c8461bec1822fd8efb709b8e2f24073f269f99b58ef2", "docker_id": "97f55b9b3d5d42eb064b6ce35c903af33aebc4d32cca70b6dad9ea3f94ffbe95" }, "instant": 1700486117.340079, "msg": "JerseyExtension: Unexpected exception caught", "time": "2023-11-20T13:15:17.340546733Z", "threadId": 75, "cluster_identifier": "api.c-139b975.kyma-stage.internal.live.k8s.ondemand.com", "sourceMethodName": "lambda$log$2", "sourceClassName": "org.eclipse.edc.monitor.logger.LoggerMonitor", "sequenceNumber": 615, "logger": "org.eclipse.edc.monitor.logger.LoggerMonitor", "_cls_parse_ts": 1700486117483, "@timestamp": "2023-11-20T13:15:17.483936343+00:00" }, "fields": { "@timestamp": [ "2023-11-20T13:15:17.483Z" ], "time": [ "2023-11-20T13:15:17.340Z" ] }, "highlight": { "kubernetes.namespace_name": [ "@opensearch-dashboards-highlighted-field@t@/opensearch-dashboards-highlighted-field@-@opensearch-dashboards-highlighted-field@hotwheels@/opensearch-dashboards-highlighted-field@-@opensearch-dashboards-highlighted-field@test@/opensearch-dashboards-highlighted-field@" ] }, "sort": [ 1700486117483 ] }
Possible Implementation
In class: SqlQueryExecutor.
@Override
public <T> Stream<T> query(Connection connection, boolean closeConnection, ResultSetMapper<T> resultSetMapper, String sql, Object... arguments) {
Objects.requireNonNull(connection, "connection");
Objects.requireNonNull(resultSetMapper, "resultSetMapper");
Objects.requireNonNull(sql, "sql");
Objects.requireNonNull(arguments, "arguments");
return StreamSupport
.stream(Spliterators.spliteratorUnknownSize(
new ResultSetIterator<T>(connection, resultSetMapper, sql, configuration, arguments), 0), false);
}
And new iterator class
package com.sap.it.dsi.connector.common.validation;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.sql.ResultSetMapper;
import org.eclipse.edc.sql.SqlQueryExecutorConfiguration;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Iterator;
public class ResultSetIterator<T> implements Iterator {
private ResultSet rs;
private PreparedStatement ps;
private Connection connection;
private String sql;
private ResultSetMapper<T> resultSetMapper;
private Object[] arguments;
private SqlQueryExecutorConfiguration configuration;
public ResultSetIterator(Connection connection, ResultSetMapper<T> resultSetMapper, String sql, SqlQueryExecutorConfiguration configuration, Object... arguments) {
assert connection != null;
assert sql != null;
this.connection = connection;
this.sql = sql;
this.resultSetMapper = resultSetMapper;
this.arguments = arguments;
this.configuration = configuration;
}
private void setArguments(PreparedStatement statement, Object[] arguments) throws SQLException {
for (int index = 0; index < arguments.length; index++) {
int position = index + 1;
setArgument(statement, position, arguments[index]);
}
}
private void setArgument(PreparedStatement statement, int position, Object argument) throws SQLException {
var argumentHandler = Arrays.stream(ArgumentHandlers.values()).filter(it -> it.accepts(argument)).findFirst().orElse(null);
if (argumentHandler != null) {
argumentHandler.handle(statement, position, argument);
} else {
statement.setObject(position, argument);
}
}
public void init() {
try {
ps = connection.prepareStatement(sql);
setArguments(ps, arguments);
ps.setFetchSize(configuration.fetchSize());
rs = ps.executeQuery();
} catch (SQLException e) {
close();
throw new EdcException(e);
}
}
@Override
public boolean hasNext() {
if (ps == null) {
init();
}
try {
boolean hasMore = rs.next();
if (!hasMore) {
close();
}
return hasMore;
} catch (SQLException e) {
close();
throw new EdcException(e);
}
}
private void close() {
try {
rs.close();
try {
ps.close();
} catch (SQLException e) {
//nothing we can do here
}
} catch (SQLException e) {
//nothing we can do here
}
}
@Override
public Object next() {
try {
return resultSetMapper.mapResultSet(rs);
} catch (Exception e) {
close();
throw new EdcException(e);
}
}
}
About this issue
- Original URL
- State: closed
- Created 7 months ago
- Comments: 17 (17 by maintainers)
Just for the record: here’s the test replicated (needs updated
PostgresStoreSetupExtensionas well: https://github.com/Think-iT-Labs/edc-connector/blob/3640-fetch-size-limit/extensions/control-plane/store/sql/control-plane-sql/src/test/java/org/eclipse/edc/connector/store/sql/catalog/PostgresDatasetResolverImplTest.java