Skip to content

Commit

Permalink
Add fetchSize to ReactiveSelectOperationSupport.
Browse files Browse the repository at this point in the history
Closes #1652
Original pull request: #1898
  • Loading branch information
mipo256 authored and mp911de committed Oct 1, 2024
1 parent 7cf81ae commit 96a4121
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
* @author Jose Luis Leon
* @author Robert Heim
* @author Sebastian Wieland
* @author Mikhail Polivakha
* @since 1.1
*/
public class R2dbcEntityTemplate implements R2dbcEntityOperations, BeanFactoryAware, ApplicationContextAware {
Expand Down Expand Up @@ -312,14 +313,14 @@ public <T> Flux<T> select(Query query, Class<T> entityClass) throws DataAccessEx
Assert.notNull(entityClass, "Entity class must not be null");

SqlIdentifier tableName = getTableName(entityClass);
return doSelect(query, entityClass, tableName, entityClass, RowsFetchSpec::all);
return doSelect(query, entityClass, tableName, entityClass, RowsFetchSpec::all, null);
}

@SuppressWarnings("unchecked")
<T, P extends Publisher<T>> P doSelect(Query query, Class<?> entityClass, SqlIdentifier tableName,
Class<T> returnType, Function<RowsFetchSpec<T>, P> resultHandler) {
Class<T> returnType, Function<RowsFetchSpec<T>, P> resultHandler, @Nullable Integer fetchSize) {

RowsFetchSpec<T> fetchSpec = doSelect(query, entityClass, tableName, returnType);
RowsFetchSpec<T> fetchSpec = doSelect(query, entityClass, tableName, returnType, fetchSize);

P result = resultHandler.apply(fetchSpec);

Expand All @@ -331,7 +332,7 @@ <T, P extends Publisher<T>> P doSelect(Query query, Class<?> entityClass, SqlIde
}

private <T> RowsFetchSpec<T> doSelect(Query query, Class<?> entityType, SqlIdentifier tableName,
Class<T> returnType) {
Class<T> returnType, @Nullable Integer fetchSize) {

StatementMapper statementMapper = dataAccessStrategy.getStatementMapper().forType(entityType);

Expand All @@ -358,13 +359,17 @@ private <T> RowsFetchSpec<T> doSelect(Query query, Class<?> entityType, SqlIdent

PreparedOperation<?> operation = statementMapper.getMappedObject(selectSpec);

return getRowsFetchSpec(databaseClient.sql(operation), entityType, returnType);
return getRowsFetchSpec(
databaseClient.sql(operation).filter((statement) -> statement.fetchSize(Optional.ofNullable(fetchSize).orElse(0))),
entityType,
returnType
);
}

@Override
public <T> Mono<T> selectOne(Query query, Class<T> entityClass) throws DataAccessException {
return doSelect(query.isLimited() ? query : query.limit(2), entityClass, getTableName(entityClass), entityClass,
RowsFetchSpec::one);
RowsFetchSpec::one, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* <pre>
* <code>
* select(Human.class)
* .withFetchSize(10)
* .from("star_wars")
* .as(Jedi.class)
* .matching(query(where("firstname").is("luke")))
Expand All @@ -44,6 +45,7 @@
* </pre>
*
* @author Mark Paluch
* @author Mikhail Polivakha
* @since 1.1
*/
public interface ReactiveSelectOperation {
Expand Down Expand Up @@ -115,6 +117,14 @@ interface SelectWithProjection<T> extends SelectWithQuery<T> {
*/
interface SelectWithQuery<T> extends TerminatingSelect<T> {

/**
* Specifies the fetch size for this query
*
* @param fetchSize
* @return
*/
SelectWithQuery<T> withFetchSize(int fetchSize);

/**
* Set the {@link Query} used as a filter in the {@code SELECT} statement.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* Implementation of {@link ReactiveSelectOperation}.
*
* @author Mark Paluch
* @author Mikhail Polivakha
* @since 1.1
*/
class ReactiveSelectOperationSupport implements ReactiveSelectOperation {
Expand All @@ -43,7 +44,7 @@ public <T> ReactiveSelect<T> select(Class<T> domainType) {

Assert.notNull(domainType, "DomainType must not be null");

return new ReactiveSelectSupport<>(this.template, domainType, domainType, Query.empty(), null);
return new ReactiveSelectSupport<>(this.template, domainType, domainType, Query.empty(), null, null);
}

static class ReactiveSelectSupport<T> implements ReactiveSelect<T> {
Expand All @@ -54,38 +55,49 @@ static class ReactiveSelectSupport<T> implements ReactiveSelect<T> {
private final Query query;
private final @Nullable SqlIdentifier tableName;

private final @Nullable Integer fetchSize;

ReactiveSelectSupport(R2dbcEntityTemplate template, Class<?> domainType, Class<T> returnType, Query query,
@Nullable SqlIdentifier tableName) {
@Nullable SqlIdentifier tableName, @Nullable Integer fetchSize) {

this.template = template;
this.domainType = domainType;
this.returnType = returnType;
this.query = query;
this.tableName = tableName;
this.fetchSize = fetchSize;
}

@Override
public SelectWithProjection<T> from(SqlIdentifier tableName) {

Assert.notNull(tableName, "Table name must not be null");

return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName);
return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName, fetchSize);
}

@Override
public <R> SelectWithQuery<R> as(Class<R> returnType) {

Assert.notNull(returnType, "ReturnType must not be null");

return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName);
return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName, fetchSize);
}

@Override
public SelectWithQuery<T> withFetchSize(int fetchSize) {

Assert.notNull(returnType, "FetchSize must not be null");

return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName, fetchSize);
}

@Override
public TerminatingSelect<T> matching(Query query) {

Assert.notNull(query, "Query must not be null");

return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName);
return new ReactiveSelectSupport<>(template, domainType, returnType, query, tableName, fetchSize);
}

@Override
Expand All @@ -100,17 +112,17 @@ public Mono<Boolean> exists() {

@Override
public Mono<T> first() {
return template.doSelect(query.limit(1), domainType, getTableName(), returnType, RowsFetchSpec::first);
return template.doSelect(query.limit(1), domainType, getTableName(), returnType, RowsFetchSpec::first, fetchSize);
}

@Override
public Mono<T> one() {
return template.doSelect(query.limit(2), domainType, getTableName(), returnType, RowsFetchSpec::one);
return template.doSelect(query.limit(2), domainType, getTableName(), returnType, RowsFetchSpec::one, fetchSize);
}

@Override
public Flux<T> all() {
return template.doSelect(query, domainType, getTableName(), returnType, RowsFetchSpec::all);
return template.doSelect(query, domainType, getTableName(), returnType, RowsFetchSpec::all, fetchSize);
}

private SqlIdentifier getTableName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.r2dbc.spi.test.MockResult;
import io.r2dbc.spi.test.MockRow;
import io.r2dbc.spi.test.MockRowMetadata;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import org.junit.jupiter.api.BeforeEach;
Expand All @@ -38,6 +39,7 @@
* Unit test for {@link ReactiveSelectOperation}.
*
* @author Mark Paluch
* @author Mikhail Polivakha
*/
public class ReactiveSelectOperationUnitTests {

Expand Down Expand Up @@ -242,6 +244,32 @@ void shouldSelectCount() {
assertThat(statement.getSql()).isEqualTo("SELECT COUNT(*) FROM person WHERE person.THE_NAME = $1");
}

@Test // gh-1652
void shouldBeAbleToProvideFetchSize() {
MockRowMetadata metadata = MockRowMetadata.builder()
.columnMetadata(MockColumnMetadata.builder().name("id").type(R2dbcType.INTEGER).build())
.build();
MockResult result = MockResult.builder()
.row(MockRow.builder().identified("id", Object.class, "Walter").metadata(metadata).build())
.build();

recorder.addStubbing(s -> s.startsWith("SELECT"), result);

entityTemplate.select(Person.class) //
.withFetchSize(10)
.matching(query(where("name").is("Walter")).limit(10).offset(20)) //
.all() //
.as(StepVerifier::create) //
.expectNextCount(1) //
.verifyComplete();

StatementRecorder.RecordedStatement statement = recorder.getCreatedStatement(s -> s.startsWith("SELECT"));

assertThat(statement.getSql())
.isEqualTo("SELECT person.* FROM person WHERE person.THE_NAME = $1 LIMIT 10 OFFSET 20");
assertThat(statement.getFetchSize()).isEqualTo(10);
}

static class Person {

@Id String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* Recorder utility for R2DBC {@link Statement}s. Allows stubbing and introspection.
*
* @author Mark Paluch
* @author Mikhail Polivakha
*/
public class StatementRecorder implements ConnectionFactory {

Expand Down Expand Up @@ -273,6 +274,8 @@ public class RecordedStatement implements Statement {

private final List<Result> results;

private int fetchSize;

private final Map<Object, Parameter> bindings = new LinkedHashMap<>();

public RecordedStatement(String sql, Result result) {
Expand All @@ -292,6 +295,10 @@ public String getSql() {
return sql;
}

public int getFetchSize() {
return fetchSize;
}

@Override
public Statement add() {
return this;
Expand Down Expand Up @@ -321,6 +328,12 @@ public Statement bindNull(String identifier, Class<?> type) {
return this;
}

@Override
public Statement fetchSize(int rows) {
fetchSize = rows;
return this;
}

@Override
public Flux<Result> execute() {
return Flux.fromIterable(results).doOnSubscribe(subscription -> executedStatements.add(this));
Expand Down

0 comments on commit 96a4121

Please # to comment.