Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Added fetchSize to ReactiveSelectOperationSupport #1898

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
* @author Jose Luis Leon
* @author Robert Heim
* @author Sebastian Wieland
* @author Mikhail Polivakha
* @since 1.1
*/
public class R2dbcEntityTemplate implements R2dbcEntityOperations, BeanFactoryAware, ApplicationContextAware {
Expand Down Expand Up @@ -312,14 +313,14 @@ public <T> Flux<T> select(Query query, Class<T> entityClass) throws DataAccessEx
Assert.notNull(entityClass, "Entity class must not be null");

SqlIdentifier tableName = getTableName(entityClass);
return doSelect(query, entityClass, tableName, entityClass, RowsFetchSpec::all);
return doSelect(query, entityClass, tableName, entityClass, RowsFetchSpec::all, null);
}

@SuppressWarnings("unchecked")
<T, P extends Publisher<T>> P doSelect(Query query, Class<?> entityClass, SqlIdentifier tableName,
Class<T> returnType, Function<RowsFetchSpec<T>, P> resultHandler) {
Class<T> returnType, Function<RowsFetchSpec<T>, P> resultHandler, @Nullable Integer fetchSize) {

RowsFetchSpec<T> fetchSpec = doSelect(query, entityClass, tableName, returnType);
RowsFetchSpec<T> fetchSpec = doSelect(query, entityClass, tableName, returnType, fetchSize);

P result = resultHandler.apply(fetchSpec);

Expand All @@ -331,7 +332,7 @@ <T, P extends Publisher<T>> P doSelect(Query query, Class<?> entityClass, SqlIde
}

private <T> RowsFetchSpec<T> doSelect(Query query, Class<?> entityType, SqlIdentifier tableName,
Class<T> returnType) {
Class<T> returnType, @Nullable Integer fetchSize) {

StatementMapper statementMapper = dataAccessStrategy.getStatementMapper().forType(entityType);

Expand All @@ -358,13 +359,17 @@ private <T> RowsFetchSpec<T> doSelect(Query query, Class<?> entityType, SqlIdent

PreparedOperation<?> operation = statementMapper.getMappedObject(selectSpec);

return getRowsFetchSpec(databaseClient.sql(operation), entityType, returnType);
return getRowsFetchSpec(
databaseClient.sql(operation).filter((statement) -> statement.fetchSize(Optional.ofNullable(fetchSize).orElse(0))),
entityType,
returnType
);
}

@Override
public <T> Mono<T> selectOne(Query query, Class<T> entityClass) throws DataAccessException {
return doSelect(query.isLimited() ? query : query.limit(2), entityClass, getTableName(entityClass), entityClass,
RowsFetchSpec::one);
RowsFetchSpec::one, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* <pre>
* <code>
* select(Human.class)
* .withFetchSize(10)
* .from("star_wars")
* .as(Jedi.class)
* .matching(query(where("firstname").is("luke")))
Expand All @@ -44,6 +45,7 @@
* </pre>
*
* @author Mark Paluch
* @author Mikhail Polivakha
* @since 1.1
*/
public interface ReactiveSelectOperation {
Expand Down Expand Up @@ -115,6 +117,14 @@ interface SelectWithProjection<T> extends SelectWithQuery<T> {
*/
interface SelectWithQuery<T> extends TerminatingSelect<T> {

/**
* Specifies the fetch size for this query
*
* @param fetchSize
* @return
*/
SelectWithQuery<T> withFetchSize(int fetchSize);

/**
* Set the {@link Query} used as a filter in the {@code SELECT} statement.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* Implementation of {@link ReactiveSelectOperation}.
*
* @author Mark Paluch
* @author Mikhail Polivakha
* @since 1.1
*/
class ReactiveSelectOperationSupport implements ReactiveSelectOperation {
Expand All @@ -43,7 +44,7 @@ public <T> ReactiveSelect<T> select(Class<T> domainType) {

Assert.notNull(domainType, "DomainType must not be null");

return new ReactiveSelectSupport<>(this.template, domainType, domainType, Query.empty(), null);
return new ReactiveSelectSupport<>(this.template, domainType, domainType, Query.empty(), null, null);
}

static class ReactiveSelectSupport<T> implements ReactiveSelect<T> {
Expand All @@ -54,38 +55,49 @@ static class ReactiveSelectSupport<T> implements ReactiveSelect<T> {
private final Query query;
private final @Nullable SqlIdentifier tableName;

private final @Nullable Integer fetchSize;

ReactiveSelectSupport(R2dbcEntityTemplate template, Class<?> domainType, Class<T> returnType, Query query,
@Nullable SqlIdentifier tableName) {
@Nullable SqlIdentifier tableName, @Nullable Integer fetchSize) {

this.template = template;
this.domainType = domainType;
this.returnType = returnType;
this.query = query;
this.tableName = tableName;
this.fetchSize = fetchSize;
}

@Override
public SelectWithProjection<T> from(SqlIdentifier tableName) {

Assert.notNull(tableName, "Table name must not be null");

return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName);
return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName, fetchSize);
}

@Override
public <R> SelectWithQuery<R> as(Class<R> returnType) {

Assert.notNull(returnType, "ReturnType must not be null");

return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName);
return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName, fetchSize);
}

@Override
public SelectWithQuery<T> withFetchSize(int fetchSize) {

Assert.notNull(returnType, "FetchSize must not be null");

return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName, fetchSize);
}

@Override
public TerminatingSelect<T> matching(Query query) {

Assert.notNull(query, "Query must not be null");

return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName);
return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName, fetchSize);
}

@Override
Expand All @@ -100,17 +112,17 @@ public Mono<Boolean> exists() {

@Override
public Mono<T> first() {
return template.doSelect(query.limit(1), domainType, getTableName(), returnType, RowsFetchSpec::first);
return template.doSelect(query.limit(1), domainType, getTableName(), returnType, RowsFetchSpec::first, fetchSize);
}

@Override
public Mono<T> one() {
return template.doSelect(query.limit(2), domainType, getTableName(), returnType, RowsFetchSpec::one);
return template.doSelect(query.limit(2), domainType, getTableName(), returnType, RowsFetchSpec::one, fetchSize);
}

@Override
public Flux<T> all() {
return template.doSelect(query, domainType, getTableName(), returnType, RowsFetchSpec::all);
return template.doSelect(query, domainType, getTableName(), returnType, RowsFetchSpec::all, fetchSize);
}

private SqlIdentifier getTableName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.r2dbc.spi.test.MockResult;
import io.r2dbc.spi.test.MockRow;
import io.r2dbc.spi.test.MockRowMetadata;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import org.junit.jupiter.api.BeforeEach;
Expand All @@ -38,6 +39,7 @@
* Unit test for {@link ReactiveSelectOperation}.
*
* @author Mark Paluch
* @author Mikhail Polivakha
*/
public class ReactiveSelectOperationUnitTests {

Expand Down Expand Up @@ -242,6 +244,32 @@ void shouldSelectCount() {
assertThat(statement.getSql()).isEqualTo("SELECT COUNT(*) FROM person WHERE person.THE_NAME = $1");
}

@Test // gh-1652
void shouldBeAbleToProvideFetchSize() {
MockRowMetadata metadata = MockRowMetadata.builder()
.columnMetadata(MockColumnMetadata.builder().name("id").type(R2dbcType.INTEGER).build())
.build();
MockResult result = MockResult.builder()
.row(MockRow.builder().identified("id", Object.class, "Walter").metadata(metadata).build())
.build();

recorder.addStubbing(s -> s.startsWith("SELECT"), result);

entityTemplate.select(Person.class) //
.withFetchSize(10)
.matching(query(where("name").is("Walter")).limit(10).offset(20)) //
.all() //
.as(StepVerifier::create) //
.expectNextCount(1) //
.verifyComplete();

StatementRecorder.RecordedStatement statement = recorder.getCreatedStatement(s -> s.startsWith("SELECT"));

assertThat(statement.getSql())
.isEqualTo("SELECT person.* FROM person WHERE person.THE_NAME = $1 LIMIT 10 OFFSET 20");
assertThat(statement.getFetchSize()).isEqualTo(10);
}

static class Person {

@Id String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* Recorder utility for R2DBC {@link Statement}s. Allows stubbing and introspection.
*
* @author Mark Paluch
* @author Mikhail Polivakha
*/
public class StatementRecorder implements ConnectionFactory {

Expand Down Expand Up @@ -273,6 +274,8 @@ public class RecordedStatement implements Statement {

private final List<Result> results;

private int fetchSize;

private final Map<Object, Parameter> bindings = new LinkedHashMap<>();

public RecordedStatement(String sql, Result result) {
Expand All @@ -292,6 +295,10 @@ public String getSql() {
return sql;
}

public int getFetchSize() {
return fetchSize;
}

@Override
public Statement add() {
return this;
Expand Down Expand Up @@ -321,6 +328,12 @@ public Statement bindNull(String identifier, Class<?> type) {
return this;
}

@Override
public Statement fetchSize(int rows) {
fetchSize = rows;
return this;
}

@Override
public Flux<Result> execute() {
return Flux.fromIterable(results).doOnSubscribe(subscription -> executedStatements.add(this));
Expand Down
Loading