From 8956b6ab621bd0de30e9976a398e649e59788976 Mon Sep 17 00:00:00 2001 From: Dmitry Yemanov Date: Fri, 26 Nov 2021 09:35:31 +0300 Subject: [PATCH] Implemented #7051: Network support for bi-directional cursors --- src/remote/client/interface.cpp | 759 ++++++++++++++++++++------------ src/remote/protocol.cpp | 24 +- src/remote/protocol.h | 20 +- src/remote/remote.h | 26 +- src/remote/server/server.cpp | 174 ++++++-- src/yvalve/why.cpp | 14 +- 6 files changed, 681 insertions(+), 336 deletions(-) diff --git a/src/remote/client/interface.cpp b/src/remote/client/interface.cpp index d55ef230694..687073b30eb 100644 --- a/src/remote/client/interface.cpp +++ b/src/remote/client/interface.cpp @@ -307,6 +307,7 @@ class ResultSet FB_FINAL : public RefCntIfacep_sqldata_out_blr.cstr_address = const_cast(out_blr); sqldata->p_sqldata_out_message_number = 0; // out_msg_type sqldata->p_sqldata_timeout = statement->rsr_timeout; + sqldata->p_sqldata_cursor_flags = 0; send_packet(port, packet); @@ -3489,7 +3491,7 @@ ITransaction* Statement::execute(CheckStatusWrapper* status, ITransaction* apiTr ResultSet* Statement::openCursor(CheckStatusWrapper* status, Firebird::ITransaction* apiTra, - IMessageMetadata* inMetadata, void* inBuffer, IMessageMetadata* outFormat, unsigned int /*flags*/) + IMessageMetadata* inMetadata, void* inBuffer, IMessageMetadata* outFormat, unsigned int flags) { /************************************** * @@ -3607,6 +3609,7 @@ ResultSet* Statement::openCursor(CheckStatusWrapper* status, Firebird::ITransact sqldata->p_sqldata_out_blr.cstr_address = const_cast(out_blr); sqldata->p_sqldata_out_message_number = 0; // out_msg_type sqldata->p_sqldata_timeout = statement->rsr_timeout; + sqldata->p_sqldata_cursor_flags = flags; send_partial_packet(port, packet); defer_packet(port, packet, true); @@ -4361,6 +4364,89 @@ ISC_UINT64 Statement::getAffectedRecords(CheckStatusWrapper* status) } +void Statement::setCursorName(CheckStatusWrapper* status, const char* cursor) +{ +/***************************************** + * + * d s q l _ s e t _ c u r s o r + * + ***************************************** + * + * Functional Description + * Declare a cursor for a dynamic request. + * + * Note: prior to version 6.0, this function terminated the + * cursor name at the first blank. With delimited cursor + * name support that is no longer sufficient. We now pass + * the entire NULL-Terminated cursor name to the server, and let + * the server deal with blank termination or not. + * NOTE: THIS NOW MEANS THAT IF CURSOR is NOT null terminated + * we will have inconsistant results with version 5.x. The only + * "normal" way this happens is if this API is called from a + * non-C host language. If that results in a later problem we + * must provide a new API that takes a "cursor_name_length" + * parameter. + * + *****************************************/ + + try + { + reset(status); + + // Check and validate handles, etc. + + Rsr* statement = getStatement(); + CHECK_HANDLE(statement, isc_bad_req_handle); + Rdb* rdb = statement->rsr_rdb; + rem_port* port = rdb->rdb_port; + RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION); + + statement->raiseException(); + + // set up the packet for the other guy... + + PACKET* packet = &rdb->rdb_packet; + + if (statement->rsr_flags.test(Rsr::LAZY)) + { + packet->p_operation = op_allocate_statement; + packet->p_rlse.p_rlse_object = rdb->rdb_id; + + send_partial_packet(rdb->rdb_port, packet); + } + + packet->p_operation = op_set_cursor; + P_SQLCUR* sqlcur = &packet->p_sqlcur; + sqlcur->p_sqlcur_statement = statement->rsr_id; + + const ULONG name_l = static_cast(strlen(cursor)); + sqlcur->p_sqlcur_cursor_name.cstr_length = name_l + 1; + sqlcur->p_sqlcur_cursor_name.cstr_address = reinterpret_cast(cursor); + sqlcur->p_sqlcur_type = 0; // type + + send_packet(port, packet); + + if (statement->rsr_flags.test(Rsr::LAZY)) + { + receive_response(status, rdb, packet); + + statement->rsr_id = packet->p_resp.p_resp_object; + SET_OBJECT(rdb, statement, statement->rsr_id); + + statement->rsr_flags.clear(Rsr::LAZY); + } + + receive_response(status, rdb, packet); + + statement->raiseException(); + } + catch (const Exception& ex) + { + ex.stuffException(status); + } +} + + void ResultSet::setDelayedOutputFormat(CheckStatusWrapper* status, IMessageMetadata* format) { try @@ -4383,7 +4469,7 @@ void ResultSet::setDelayedOutputFormat(CheckStatusWrapper* status, IMessageMetad } -int ResultSet::fetchNext(CheckStatusWrapper* status, void* buffer) +bool ResultSet::fetch(CheckStatusWrapper* status, void* buffer, P_FETCH operation, int position) { /************************************** * @@ -4396,250 +4482,422 @@ int ResultSet::fetchNext(CheckStatusWrapper* status, void* buffer) * **************************************/ - try + reset(status); + + // Check and validate handles, etc. + + if (delayedFormat || !stmt) { - reset(status); + (Arg::Gds(isc_dsql_cursor_err) << Arg::Gds(isc_bad_req_handle)).raise(); + } - // Check and validate handles, etc. + Rsr* const statement = stmt->getStatement(); + CHECK_HANDLE(statement, isc_bad_req_handle); + + Rdb* const rdb = statement->rsr_rdb; + CHECK_HANDLE(rdb, isc_bad_db_handle); + + rem_port* const port = rdb->rdb_port; + + // Scrolling is not available in older protocols + if (operation != fetch_next && port->port_protocol < PROTOCOL_FETCH_SCROLL) + unsupported(); + + // Whether we're fetching in the forward direction + const bool forward = + (operation == fetch_next || operation == fetch_last || + ((operation == fetch_absolute || operation == fetch_relative) && position > 0)); + + // Whether we're fetching relatively to the current position + const bool relative = + (operation == fetch_next || operation == fetch_prior || operation == fetch_relative); + + BlrFromMessage outBlr(outputFormat, stmt->getDialect(), port->port_protocol); + unsigned int blr_length = outBlr.getLength(); + const UCHAR* blr = outBlr.getBytes(); + const unsigned int msg_length = outBlr.getMsgLength(); + UCHAR* msg = static_cast(buffer); + + // Validate data length - if (delayedFormat || !stmt) + CHECK_LENGTH(port, blr_length); + CHECK_LENGTH(port, msg_length); + + RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION); + + if (!statement->rsr_flags.test(Rsr::FETCHED)) + { + // On first fetch, clear the end-of-stream flag & reset the message buffers + + statement->raiseException(); + + statement->rsr_flags.clear(Rsr::STREAM_END | Rsr::PAST_END | Rsr::STREAM_ERR); + statement->rsr_rows_pending = 0; + statement->rsr_fetch_operation = operation; + statement->rsr_fetch_position = position; + statement->clearException(); + + RMessage* message = statement->rsr_message; + if (message) { - (Arg::Gds(isc_dsql_cursor_err) << Arg::Gds(isc_bad_req_handle)).raise(); + statement->rsr_buffer = message; + + while (true) + { + message->msg_address = NULL; + message = message->msg_next; + + if (message == statement->rsr_message) + break; + } } - Rsr* statement = stmt->getStatement(); - CHECK_HANDLE(statement, isc_bad_req_handle); + } + else if (relative) + { + // For the relative fetch, check whether end-of-stream is already reached - Rdb* rdb = statement->rsr_rdb; - CHECK_HANDLE(rdb, isc_bad_db_handle); + if (forward) + { + if (statement->rsr_flags.testAll(Rsr::EOF_SET | Rsr::PAST_EOF)) + Arg::Gds(isc_req_sync).raise(); + } + else + { + if (statement->rsr_flags.testAll(Rsr::BOF_SET | Rsr::PAST_BOF)) + Arg::Gds(isc_req_sync).raise(); + } + } + else + { + // Clear the end-of-stream flag if the fetch is positioned absolutely + statement->rsr_flags.clear(Rsr::STREAM_END | Rsr::PAST_END); + } - rem_port* port = rdb->rdb_port; + // Parse the blr describing the message, if there is any. - BlrFromMessage outBlr(outputFormat, stmt->getDialect(), port->port_protocol); - unsigned int blr_length = outBlr.getLength(); - const UCHAR* blr = outBlr.getBytes(); - const unsigned int msg_length = outBlr.getMsgLength(); - UCHAR* msg = static_cast(buffer); + if (blr_length) + { + if (statement->rsr_user_select_format && + statement->rsr_user_select_format != statement->rsr_select_format) + { + delete statement->rsr_user_select_format; + } - // Validate data length + statement->rsr_user_select_format = PARSE_msg_format(blr, blr_length); - CHECK_LENGTH(port, blr_length); - CHECK_LENGTH(port, msg_length); + if (statement->rsr_flags.test(Rsr::FETCHED)) + blr_length = 0; + else + { + delete statement->rsr_select_format; + statement->rsr_select_format = statement->rsr_user_select_format; + } + } - RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION); + if (!statement->rsr_buffer) + { + statement->rsr_buffer = FB_NEW RMessage(0); + statement->rsr_message = statement->rsr_buffer; + statement->rsr_message->msg_next = statement->rsr_message; + statement->rsr_fmt_length = 0; + } - // On first fetch, clear the end-of-stream flag & reset the message buffers + RMessage* message = statement->rsr_message; - if (!statement->rsr_flags.test(Rsr::FETCHED)) +#ifdef DEBUG + fprintf(stdout, "Rows Pending in REM_fetch=%lu\n", statement->rsr_rows_pending); +#endif + + // If the fetch direction was changed, we don't need the batched rows anymore. + // Swallow them and reset the stream for subsequent fetches. + + if (operation != statement->rsr_fetch_operation || + position != statement->rsr_fetch_position) + { + while (statement->rsr_rows_pending) + receive_queued_packet(port, statement->rsr_id); + + if (statement->rsr_flags.test(Rsr::STREAM_ERR)) { + statement->rsr_flags.clear(Rsr::STREAM_ERR); + + // hvlad: prevent subsequent fetches + statement->rsr_flags.set(Rsr::STREAM_END); statement->raiseException(); + } - statement->rsr_flags.clear(Rsr::EOF_SET | Rsr::STREAM_ERR | Rsr::PAST_EOF); - statement->rsr_rows_pending = 0; - statement->clearException(); + statement->rsr_flags.clear(Rsr::STREAM_END | Rsr::PAST_END); + + // We have some messages in the queue. Reset them for reuse. + + if (statement->rsr_msgs_waiting) + { + fb_assert(statement->rsr_fetch_operation == fetch_next || + statement->rsr_fetch_operation == fetch_prior); RMessage* message = statement->rsr_message; if (message) { statement->rsr_buffer = message; + while (true) { message->msg_address = NULL; message = message->msg_next; - if (message == statement->rsr_message) { + + if (message == statement->rsr_message) break; - } } } - } - else if (statement->rsr_flags.testAll(Rsr::EOF_SET | Rsr::PAST_EOF)) - { - Arg::Gds(isc_req_sync).raise(); - } - // Parse the blr describing the message, if there is any. + const ULONG offset = statement->rsr_msgs_waiting + 1; + statement->rsr_msgs_waiting = 0; - if (blr_length) - { - if (statement->rsr_user_select_format && - statement->rsr_user_select_format != statement->rsr_select_format) + // If we had some rows batched and the requested scrolling is relative, + // then move the server cursor to the actual client's position before proceeding. + // We don't know the absolute client's position, but it's not really necessary. + // rsr_msgs_waiting shows how much we're ahead the server, so we may re-position + // the cursor relatively. + + if (relative) { - delete statement->rsr_user_select_format; - } + const bool isAhead = (statement->rsr_fetch_operation == fetch_next); + + PACKET* packet = &rdb->rdb_packet; + packet->p_operation = op_fetch_scroll; + P_SQLDATA* sqldata = &packet->p_sqldata; + sqldata->p_sqldata_statement = statement->rsr_id; + sqldata->p_sqldata_blr.cstr_length = 0; + sqldata->p_sqldata_blr.cstr_address = nullptr; + sqldata->p_sqldata_message_number = 0; // msg_type + sqldata->p_sqldata_messages = statement->rsr_select_format ? 1 : 0; + sqldata->p_sqldata_fetch_op = fetch_relative; + sqldata->p_sqldata_fetch_pos = isAhead ? -offset : offset; + + send_packet(port, packet); + + // Receive response packets. If everything is OK, there should be two of them: + // first with packet->p_sqldata.p_sqldata_messages == 1 and second with + // packet->p_sqldata.p_sqldata_messages == 0 (end-of-batch). + + do + { + receive_packet(rdb->rdb_port, packet); - statement->rsr_user_select_format = PARSE_msg_format(blr, blr_length); + // If we get an error, handle it + if (packet->p_operation != op_fetch_response) + { + statement->rsr_flags.set(Rsr::STREAM_ERR); + REMOTE_check_response(status, rdb, packet); + break; + } - if (statement->rsr_flags.test(Rsr::FETCHED)) - blr_length = 0; - else - { - delete statement->rsr_select_format; - statement->rsr_select_format = statement->rsr_user_select_format; + // If we get end-of-stream, something went seriously wrong, thus punt + if (packet->p_sqldata.p_sqldata_status == 100) + Arg::Gds(isc_req_sync).raise(); + + // We should get either the requested row or the end-of-batch marker + fb_assert(packet->p_sqldata.p_sqldata_messages == 0 || + packet->p_sqldata.p_sqldata_messages == 1); + + // Release the received message, we don't need it + const auto message = statement->rsr_message; + if (message && message->msg_address) + { + statement->rsr_message = message->msg_next; + message->msg_address = NULL; + } + } + while (packet->p_sqldata.p_sqldata_messages); } } - if (!statement->rsr_buffer) + // These are the necessary conditions to continue fetching (see below) + fb_assert(!statement->rsr_flags.test(Rsr::STREAM_END | Rsr::STREAM_ERR)); + fb_assert(!statement->rsr_message->msg_address); + fb_assert(!statement->rsr_rows_pending); + } + + // Check to see if data is waiting. If not, solicite data. + + if ((!statement->rsr_flags.test(Rsr::STREAM_END | Rsr::STREAM_ERR) && + !statement->rsr_message->msg_address && !statement->rsr_rows_pending) || + ( // Low in inventory + (statement->rsr_rows_pending <= statement->rsr_reorder_level) && + (statement->rsr_msgs_waiting <= statement->rsr_reorder_level) && + // not using named pipe on NT + // Pipelining causes both server & client to + // write at the same time. In named pipes, writes + // block for the other end to read - and so when both + // attempt to write simultaneously, they end up + // waiting indefinitely for the other end to read + (port->port_type != rem_port::PIPE) && + (port->port_type != rem_port::XNET) && + // We're fetching either forward or backward + (operation == fetch_next || operation == fetch_prior) && + // We've reached end-of-stream or there was an error + !statement->rsr_flags.test(Rsr::STREAM_END | Rsr::STREAM_ERR) && + // No error pending + !statement->haveException() )) + { + // set up the packet for the other guy... + + PACKET* packet = &rdb->rdb_packet; + packet->p_operation = (operation == fetch_next) ? op_fetch : op_fetch_scroll; + P_SQLDATA* sqldata = &packet->p_sqldata; + sqldata->p_sqldata_statement = statement->rsr_id; + sqldata->p_sqldata_blr.cstr_length = blr_length; + sqldata->p_sqldata_blr.cstr_address = const_cast(blr); + sqldata->p_sqldata_message_number = 0; // msg_type + sqldata->p_sqldata_messages = statement->rsr_select_format ? 1 : 0; + sqldata->p_sqldata_fetch_op = operation; + sqldata->p_sqldata_fetch_pos = position; + + if (statement->rsr_select_format) { - statement->rsr_buffer = FB_NEW RMessage(0); - statement->rsr_message = statement->rsr_buffer; - statement->rsr_message->msg_next = statement->rsr_message; - statement->rsr_fmt_length = 0; - } + if (operation == fetch_next || operation == fetch_prior) + { + sqldata->p_sqldata_messages = REMOTE_compute_batch_size( + port, 0, op_fetch_response, statement->rsr_select_format); + } - RMessage* message = statement->rsr_message; + // Reorder data when the local buffer is half empty + statement->rsr_reorder_level = sqldata->p_sqldata_messages / 2; #ifdef DEBUG - fprintf(stdout, "Rows Pending in REM_fetch=%lu\n", statement->rsr_rows_pending); + fprintf(stdout, "Recalculating Rows Pending in REM_fetch=%lu\n", + statement->rsr_rows_pending); #endif + } - // Check to see if data is waiting. If not, solicite data. - - if ((!statement->rsr_flags.test(Rsr::EOF_SET | Rsr::STREAM_ERR) && - (!statement->rsr_message->msg_address) && (statement->rsr_rows_pending == 0)) || - ( // Low in inventory - (statement->rsr_rows_pending <= statement->rsr_reorder_level) && - (statement->rsr_msgs_waiting <= statement->rsr_reorder_level) && - // not using named pipe on NT - // Pipelining causes both server & client to - // write at the same time. In named pipes, writes - // block for the other end to read - and so when both - // attempt to write simultaenously, they end up - // waiting indefinetly for the other end to read - (port->port_type != rem_port::PIPE) && - (port->port_type != rem_port::XNET) && - // We've reached eof or there was an error - !statement->rsr_flags.test(Rsr::EOF_SET | Rsr::STREAM_ERR) && - // No error pending - !statement->haveException() )) - { - // set up the packet for the other guy... + statement->rsr_rows_pending += sqldata->p_sqldata_messages; - PACKET* packet = &rdb->rdb_packet; - packet->p_operation = op_fetch; - P_SQLDATA* sqldata = &packet->p_sqldata; - sqldata->p_sqldata_statement = statement->rsr_id; - sqldata->p_sqldata_blr.cstr_length = blr_length; - sqldata->p_sqldata_blr.cstr_address = const_cast(blr); - sqldata->p_sqldata_message_number = 0; // msg_type - sqldata->p_sqldata_messages = 0; - if (statement->rsr_select_format) - { - sqldata->p_sqldata_messages = - REMOTE_compute_batch_size(port, 0, op_fetch_response, statement->rsr_select_format); + // We've either got data, or some is on the way, or we have an error, or we have EOF - // Reorder data when the local buffer is half empty + if (!(statement->rsr_msgs_waiting || + statement->rsr_rows_pending || + statement->haveException() || + statement->rsr_flags.test(Rsr::STREAM_END))) + { + // We were asked to fetch from the statement, not ready for it. + // Give up before sending something to the server. + Arg::Gds(isc_req_sync).raise(); + } - statement->rsr_reorder_level = sqldata->p_sqldata_messages / 2; -#ifdef DEBUG - fprintf(stdout, "Recalculating Rows Pending in REM_fetch=%lu\n", - statement->rsr_rows_pending); -#endif - } - statement->rsr_rows_pending += sqldata->p_sqldata_messages; + // Make the batch request - and force the packet over the wire - // We've either got data, or some is on the way, or we have an error, or we have EOF + send_packet(port, packet); - if (!(statement->rsr_msgs_waiting || (statement->rsr_rows_pending > 0) || - statement->haveException() || statement->rsr_flags.test(Rsr::EOF_SET))) - { - // We were asked to fetch from the statement, not ready for it - // Give up before sending something to the server - Arg::Gds(isc_req_sync).raise(); - } + statement->rsr_batch_count++; + statement->rsr_fetch_operation = operation; + statement->rsr_fetch_position = position; - // Make the batch request - and force the packet over the wire + // Queue up receipt of the pending data - send_packet(port, packet); + enqueue_receive(port, batch_dsql_fetch, rdb, statement, NULL); - statement->rsr_batch_count++; + fb_assert(statement->rsr_rows_pending || !statement->rsr_select_format); + } - // Queue up receipt of the pending data + // Receive queued responses until we have some data for this cursor + // or an error status has been received. - enqueue_receive(port, batch_dsql_fetch, rdb, statement, NULL); + // We've either got data, or some is on the way, or we have an error, or we have EOF - fb_assert(statement->rsr_rows_pending > 0 || (!statement->rsr_select_format)); - } + fb_assert(statement->rsr_msgs_waiting || statement->rsr_rows_pending || + statement->haveException() || statement->rsr_flags.test(Rsr::STREAM_END)); - // Receive queued responses until we have some data for this cursor - // or an error status has been received. + while (!statement->haveException() && // received a database error + !statement->rsr_flags.test(Rsr::STREAM_END) && // reached end of stream + statement->rsr_msgs_waiting < 2 && // Have looked ahead for end of batch + statement->rsr_rows_pending) + { + // Hit end of batch + receive_queued_packet(port, statement->rsr_id); + } - // We've either got data, or some is on the way, or we have an error, or we have EOF + if (!statement->rsr_msgs_waiting) + { + if (statement->rsr_flags.test(Rsr::STREAM_END)) + { + // hvlad: we may have queued fetch packet but received end-of-stream before start + // handling of this packet. Handle it now. + clear_stmt_que(port, statement); - fb_assert(statement->rsr_msgs_waiting || (statement->rsr_rows_pending > 0) || - statement->haveException() || statement->rsr_flags.test(Rsr::EOF_SET)); + // hvlad: as we processed all queued packets at code above we can leave Rsr::EOF_SET flag. + // It allows us to return EOF for all subsequent isc_dsql_fetch calls until statement + // will be re-executed (and without roundtrip to remote server). + //statement->rsr_flags.clear(Rsr::STREAM_END); - while (!statement->haveException() && // received a database error - !statement->rsr_flags.test(Rsr::EOF_SET) && // reached end of cursor - statement->rsr_msgs_waiting < 2 && // Have looked ahead for end of batch - statement->rsr_rows_pending != 0) - { - // Hit end of batch - receive_queued_packet(port, statement->rsr_id); - } + if (statement->rsr_flags.test(Rsr::BOF_SET)) + statement->rsr_flags.set(Rsr::PAST_BOF); - if (!statement->rsr_msgs_waiting) - { if (statement->rsr_flags.test(Rsr::EOF_SET)) - { - // hvlad: we may have queued fetch packet but received EOF before start - // handling of this packet. Handle it now. - clear_stmt_que(port, statement); - - // hvlad: as we processed all queued packets at code above we can leave Rsr::EOF_SET flag. - // It allows us to return EOF for all subsequent isc_dsql_fetch calls until statement - // will be re-executed (and without roundtrip to remote server). - //statement->rsr_flags.clear(Rsr::EOF_SET); statement->rsr_flags.set(Rsr::PAST_EOF); - return IStatus::RESULT_NO_DATA; - } + return false; + } - if (statement->rsr_flags.test(Rsr::STREAM_ERR)) - { - // The previous batch of receives ended with an error status. - // We're all done returning data in the local queue. - // Return that error status vector to the user. + if (statement->rsr_flags.test(Rsr::STREAM_ERR)) + { + // The previous batch of receives ended with an error status. + // We're all done returning data in the local queue. + // Return that error status vector to the user. - // Stuff in the error result to the user's vector + // Stuff in the error result to the user's vector - statement->rsr_flags.clear(Rsr::STREAM_ERR); + statement->rsr_flags.clear(Rsr::STREAM_ERR); - // hvlad: prevent subsequent fetches - statement->rsr_flags.set(Rsr::EOF_SET); - statement->raiseException(); - } + // hvlad: prevent subsequent fetches + statement->rsr_flags.set(Rsr::STREAM_END); + statement->raiseException(); } - statement->rsr_msgs_waiting--; + } - message = statement->rsr_message; - statement->rsr_message = message->msg_next; + statement->rsr_msgs_waiting--; - if (statement->rsr_user_select_format->fmt_length != msg_length) - { - status_exception::raise(Arg::Gds(isc_port_len) << - Arg::Num(msg_length) << Arg::Num(statement->rsr_user_select_format->fmt_length)); - } - if (statement->rsr_user_select_format == statement->rsr_select_format) - { - if (!msg || !message->msg_address) - { - move_error(Arg::Gds(isc_dsql_sqlda_err)); - // Msg 263 SQLDA missing or wrong number of variables - } - memcpy(msg, message->msg_address, msg_length); - } - else + message = statement->rsr_message; + statement->rsr_message = message->msg_next; + + if (statement->rsr_user_select_format->fmt_length != msg_length) + { + status_exception::raise(Arg::Gds(isc_port_len) << + Arg::Num(msg_length) << Arg::Num(statement->rsr_user_select_format->fmt_length)); + } + + if (statement->rsr_user_select_format == statement->rsr_select_format) + { + if (!msg || !message->msg_address) { - mov_dsql_message(message->msg_address, statement->rsr_select_format, msg, - statement->rsr_user_select_format); + move_error(Arg::Gds(isc_dsql_sqlda_err)); + // Msg 263 SQLDA missing or wrong number of variables } - message->msg_address = NULL; - return IStatus::RESULT_OK; + memcpy(msg, message->msg_address, msg_length); + } + else + { + mov_dsql_message(message->msg_address, statement->rsr_select_format, msg, + statement->rsr_user_select_format); + } + + message->msg_address = NULL; + return true; +} + + +int ResultSet::fetchNext(CheckStatusWrapper* user_status, void* buffer) +{ + try + { + return fetch(user_status, buffer, fetch_next) ? + IStatus::RESULT_OK : IStatus::RESULT_NO_DATA; } catch (const Exception& ex) { - ex.stuffException(status); + ex.stuffException(user_status); } + return IStatus::RESULT_ERROR; } @@ -4648,15 +4906,15 @@ int ResultSet::fetchPrior(CheckStatusWrapper* user_status, void* buffer) { try { - status_exception::raise(Arg::Gds(isc_wish_list)); + return fetch(user_status, buffer, fetch_prior) ? + IStatus::RESULT_OK : IStatus::RESULT_NO_DATA; } catch (const Exception& ex) { ex.stuffException(user_status); - return FB_FALSE; } - return FB_TRUE; + return IStatus::RESULT_ERROR; } @@ -4664,15 +4922,15 @@ int ResultSet::fetchFirst(CheckStatusWrapper* user_status, void* buffer) { try { - status_exception::raise(Arg::Gds(isc_wish_list)); + return fetch(user_status, buffer, fetch_first) ? + IStatus::RESULT_OK : IStatus::RESULT_NO_DATA; } catch (const Exception& ex) { ex.stuffException(user_status); - return FB_FALSE; } - return FB_TRUE; + return IStatus::RESULT_ERROR; } @@ -4680,15 +4938,15 @@ int ResultSet::fetchLast(CheckStatusWrapper* user_status, void* buffer) { try { - status_exception::raise(Arg::Gds(isc_wish_list)); + return fetch(user_status, buffer, fetch_last) ? + IStatus::RESULT_OK : IStatus::RESULT_NO_DATA; } catch (const Exception& ex) { ex.stuffException(user_status); - return FB_FALSE; } - return FB_TRUE; + return IStatus::RESULT_ERROR; } @@ -4696,15 +4954,15 @@ int ResultSet::fetchAbsolute(CheckStatusWrapper* user_status, int position, void { try { - status_exception::raise(Arg::Gds(isc_wish_list)); + return fetch(user_status, buffer, fetch_absolute, position) ? + IStatus::RESULT_OK : IStatus::RESULT_NO_DATA; } catch (const Exception& ex) { ex.stuffException(user_status); - return FB_FALSE; } - return FB_TRUE; + return IStatus::RESULT_ERROR; } @@ -4712,102 +4970,48 @@ int ResultSet::fetchRelative(CheckStatusWrapper* user_status, int offset, void* { try { - status_exception::raise(Arg::Gds(isc_wish_list)); + return fetch(user_status, buffer, fetch_relative, offset) ? + IStatus::RESULT_OK : IStatus::RESULT_NO_DATA; } catch (const Exception& ex) { ex.stuffException(user_status); - return FB_FALSE; } - return FB_TRUE; + return IStatus::RESULT_ERROR; } -void Statement::setCursorName(CheckStatusWrapper* status, const char* cursor) +FB_BOOLEAN ResultSet::isEof(CheckStatusWrapper* status) { -/***************************************** - * - * d s q l _ s e t _ c u r s o r - * - ***************************************** - * - * Functional Description - * Declare a cursor for a dynamic request. - * - * Note: prior to version 6.0, this function terminated the - * cursor name at the first blank. With delimited cursor - * name support that is no longer sufficient. We now pass - * the entire NULL-Terminated cursor name to the server, and let - * the server deal with blank termination or not. - * NOTE: THIS NOW MEANS THAT IF CURSOR is NOT null terminated - * we will have inconsistant results with version 5.x. The only - * "normal" way this happens is if this API is called from a - * non-C host language. If that results in a later problem we - * must provide a new API that takes a "cursor_name_length" - * parameter. - * - *****************************************/ - try { reset(status); // Check and validate handles, etc. - Rsr* statement = getStatement(); - CHECK_HANDLE(statement, isc_bad_req_handle); - Rdb* rdb = statement->rsr_rdb; - rem_port* port = rdb->rdb_port; - RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION); - - statement->raiseException(); - - // set up the packet for the other guy... - - PACKET* packet = &rdb->rdb_packet; - - if (statement->rsr_flags.test(Rsr::LAZY)) - { - packet->p_operation = op_allocate_statement; - packet->p_rlse.p_rlse_object = rdb->rdb_id; - - send_partial_packet(rdb->rdb_port, packet); - } - - packet->p_operation = op_set_cursor; - P_SQLCUR* sqlcur = &packet->p_sqlcur; - sqlcur->p_sqlcur_statement = statement->rsr_id; - - const ULONG name_l = static_cast(strlen(cursor)); - sqlcur->p_sqlcur_cursor_name.cstr_length = name_l + 1; - sqlcur->p_sqlcur_cursor_name.cstr_address = reinterpret_cast(cursor); - sqlcur->p_sqlcur_type = 0; // type - - send_packet(port, packet); - - if (statement->rsr_flags.test(Rsr::LAZY)) + if (!stmt) { - receive_response(status, rdb, packet); - - statement->rsr_id = packet->p_resp.p_resp_object; - SET_OBJECT(rdb, statement, statement->rsr_id); - - statement->rsr_flags.clear(Rsr::LAZY); + Arg::Gds(isc_dsql_cursor_err).raise(); } + Rsr* statement = stmt->getStatement(); + CHECK_HANDLE(statement, isc_bad_req_handle); - receive_response(status, rdb, packet); + if (!statement->rsr_flags.test(Rsr::FETCHED)) + return FB_FALSE; - statement->raiseException(); + return statement->rsr_flags.test(Rsr::EOF_SET) ? FB_TRUE : FB_FALSE; } catch (const Exception& ex) { ex.stuffException(status); } + + return FB_FALSE; } -FB_BOOLEAN ResultSet::isEof(CheckStatusWrapper* status) +FB_BOOLEAN ResultSet::isBof(CheckStatusWrapper* status) { try { @@ -4822,21 +5026,10 @@ FB_BOOLEAN ResultSet::isEof(CheckStatusWrapper* status) Rsr* statement = stmt->getStatement(); CHECK_HANDLE(statement, isc_bad_req_handle); - return statement->rsr_flags.test(Rsr::EOF_SET) ? FB_TRUE : FB_FALSE; - } - catch (const Exception& ex) - { - ex.stuffException(status); - } - return FB_FALSE; -} - + if (!statement->rsr_flags.test(Rsr::FETCHED)) + return FB_TRUE; -FB_BOOLEAN ResultSet::isBof(CheckStatusWrapper* status) -{ - try - { - status_exception::raise(Arg::Gds(isc_wish_list)); + return statement->rsr_flags.test(Rsr::BOF_SET) ? FB_TRUE : FB_FALSE; } catch (const Exception& ex) { @@ -7287,8 +7480,8 @@ static void batch_dsql_fetch(rem_port* port, // to handoff to them. We'll grab the whole batch when we need to // receive a response for a DIFFERENT network request on the wire, // so we have to clear the wire before the response can be received - // In addtion to the above we grab all the records in case of XNET as - // we need to clear the queue + // In addition to the above we grab all the records in case of XNET as + // we need to clear the queue. const bool clear_queue = (id != statement->rsr_id || port->port_type == rem_port::XNET); statement->rsr_flags.set(Rsr::FETCHED); @@ -7307,9 +7500,9 @@ static void batch_dsql_fetch(rem_port* port, new_msg->msg_next = message; - while (message->msg_next != new_msg->msg_next) { + while (message->msg_next != new_msg->msg_next) message = message->msg_next; - } + message->msg_next = new_msg; } @@ -7322,12 +7515,14 @@ static void batch_dsql_fetch(rem_port* port, statement->rsr_rows_pending = 0; --statement->rsr_batch_count; dequeue_receive(port); + throw; } if (packet->p_operation != op_fetch_response) { statement->rsr_flags.set(Rsr::STREAM_ERR); + try { REMOTE_check_response(&status, rdb, packet); @@ -7342,6 +7537,7 @@ static void batch_dsql_fetch(rem_port* port, statement->rsr_rows_pending = 0; --statement->rsr_batch_count; dequeue_receive(port); + break; } @@ -7351,17 +7547,28 @@ static void batch_dsql_fetch(rem_port* port, { if (packet->p_sqldata.p_sqldata_status == 100) { - statement->rsr_flags.set(Rsr::EOF_SET); + const auto operation = statement->rsr_fetch_operation; + const auto position = statement->rsr_fetch_position; + + const bool forward = + (operation == fetch_next || operation == fetch_last || + ((operation == fetch_absolute || operation == fetch_relative) && position > 0)); + + if (forward) + statement->rsr_flags.set(Rsr::EOF_SET); + else + statement->rsr_flags.set(Rsr::BOF_SET); + statement->rsr_rows_pending = 0; #ifdef DEBUG fprintf(stdout, "Resetting Rows Pending in batch_dsql_fetch=%lu\n", statement->rsr_rows_pending); #endif } - --statement->rsr_batch_count; - if (statement->rsr_batch_count == 0) { + + if (--statement->rsr_batch_count == 0) statement->rsr_rows_pending = 0; - } + dequeue_receive(port); // clear next queued batch(es) if present @@ -7375,15 +7582,17 @@ static void batch_dsql_fetch(rem_port* port, } break; } + statement->rsr_msgs_waiting++; statement->rsr_rows_pending--; + #ifdef DEBUG fprintf(stdout, "Decrementing Rows Pending in batch_dsql_fetch=%lu\n", statement->rsr_rows_pending); #endif - if (!clear_queue) { + + if (!clear_queue) break; - } } } diff --git a/src/remote/protocol.cpp b/src/remote/protocol.cpp index 66bfcba179a..d87aa5f2272 100644 --- a/src/remote/protocol.cpp +++ b/src/remote/protocol.cpp @@ -302,6 +302,8 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p) } #endif + const auto port = xdrs->x_public; + switch (p->p_operation) { case op_reject: @@ -659,11 +661,10 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p) } MAP(xdr_short, reinterpret_cast(sqldata->p_sqldata_out_message_number)); } - { // scope - rem_port* port = xdrs->x_public; - if (port->port_protocol >= PROTOCOL_STMT_TOUT) - MAP(xdr_u_long, sqldata->p_sqldata_timeout); - } + if (port->port_protocol >= PROTOCOL_STMT_TOUT) + MAP(xdr_u_long, sqldata->p_sqldata_timeout); + if (port->port_protocol >= PROTOCOL_FETCH_SCROLL) + MAP(xdr_u_long, sqldata->p_sqldata_cursor_flags); DEBUG_PRINTSIZE(xdrs, p->p_operation); return P_TRUE(xdrs, p); @@ -702,6 +703,7 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p) return P_TRUE(xdrs, p); case op_fetch: + case op_fetch_scroll: sqldata = &p->p_sqldata; MAP(xdr_short, reinterpret_cast(sqldata->p_sqldata_statement)); if (!xdr_sql_blr(xdrs, (SLONG) sqldata->p_sqldata_statement, @@ -711,6 +713,15 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p) } MAP(xdr_short, reinterpret_cast(sqldata->p_sqldata_message_number)); MAP(xdr_short, reinterpret_cast(sqldata->p_sqldata_messages)); + if (p->p_operation == op_fetch_scroll) + { + MAP(xdr_short, reinterpret_cast(sqldata->p_sqldata_fetch_op)); + if (sqldata->p_sqldata_fetch_op == fetch_absolute || + sqldata->p_sqldata_fetch_op == fetch_relative) + { + MAP(xdr_long, sqldata->p_sqldata_fetch_pos); + } + } DEBUG_PRINTSIZE(xdrs, p->p_operation); return P_TRUE(xdrs, p); @@ -820,7 +831,6 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p) P_CRYPT_CALLBACK* cc = &p->p_cc; MAP(xdr_cstring, cc->p_cc_data); - rem_port* port = xdrs->x_public; // If the protocol is 0 we are in the process of establishing a connection. // crypt_key_callback at this phaze means server protocol is at least P15 if (port->port_protocol >= PROTOCOL_VERSION14 || port->port_protocol == 0) @@ -856,7 +866,6 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p) return P_TRUE(xdrs, p); } - rem_port* port = xdrs->x_public; SSHORT statement_id = b->p_batch_statement; Rsr* statement; if (statement_id >= 0) @@ -933,7 +942,6 @@ bool_t xdr_protocol(RemoteXdr* xdrs, PACKET* p) if (xdrs->x_op == XDR_FREE) return P_TRUE(xdrs, p); - rem_port* port = xdrs->x_public; SSHORT statement_id = b->p_batch_statement; DEB_RBATCH(fprintf(stderr, "BatRem: xdr CS %d\n", statement_id)); Rsr* statement; diff --git a/src/remote/protocol.h b/src/remote/protocol.h index 9d44191b736..862f896db01 100644 --- a/src/remote/protocol.h +++ b/src/remote/protocol.h @@ -94,9 +94,10 @@ const USHORT PROTOCOL_VERSION16 = (FB_PROTOCOL_FLAG | 16); const USHORT PROTOCOL_STMT_TOUT = PROTOCOL_VERSION16; // Protocol 17: -// - supports op_batch_sync, op_info_batch +// - supports op_batch_sync, op_info_batch, op_fetch_scroll const USHORT PROTOCOL_VERSION17 = (FB_PROTOCOL_FLAG | 17); +const USHORT PROTOCOL_FETCH_SCROLL = PROTOCOL_VERSION17; // Architecture types @@ -144,6 +145,18 @@ const int INVALID_OBJECT = MAX_USHORT; const USHORT STMT_NO_BATCH = 2; const USHORT STMT_DEFER_EXECUTE = 4; +enum P_FETCH +{ + fetch_next = 0, + fetch_prior = 1, + fetch_first = 2, + fetch_last = 3, + fetch_absolute = 4, + fetch_relative = 5 +}; + +const P_FETCH fetch_execute = fetch_next; + // Operation (packet) types enum P_OP @@ -297,6 +310,8 @@ enum P_OP op_batch_sync = 110, op_info_batch = 111, + op_fetch_scroll = 112, + op_max }; @@ -605,6 +620,9 @@ typedef struct p_sqldata USHORT p_sqldata_out_message_number; ULONG p_sqldata_status; // final eof status ULONG p_sqldata_timeout; // statement timeout + ULONG p_sqldata_cursor_flags; // cursor flags + P_FETCH p_sqldata_fetch_op; // Fetch operation + SLONG p_sqldata_fetch_pos; // Fetch position } P_SQLDATA; typedef struct p_sqlfree diff --git a/src/remote/remote.h b/src/remote/remote.h index 4032a50d272..abff4c7153c 100644 --- a/src/remote/remote.h +++ b/src/remote/remote.h @@ -497,6 +497,9 @@ struct Rsr : public Firebird::GlobalStorage, public TypedHandle Firebird::BatchCompletionState* rsr_batch_cs; // client }; + P_FETCH rsr_fetch_operation; // Last performed fetch operation + SLONG rsr_fetch_position; // and position + struct BatchStream { BatchStream() @@ -526,17 +529,21 @@ struct Rsr : public Firebird::GlobalStorage, public TypedHandle public: // Values for rsr_flags. - enum { + enum : USHORT { FETCHED = 1, // Cleared by execute, set by fetch EOF_SET = 2, // End-of-stream encountered - //BLOB = 4, // Statement relates to blob op - NO_BATCH = 8, // Do not batch fetch rows - STREAM_ERR = 16, // There is an error pending in the batched rows - LAZY = 32, // To be allocated at the first reference - DEFER_EXECUTE = 64, // op_execute can be deferred - PAST_EOF = 128 // EOF was returned by fetch from this statement + NO_BATCH = 4, // Do not batch fetch rows + STREAM_ERR = 8, // There is an error pending in the batched rows + LAZY = 16, // To be allocated at the first reference + DEFER_EXECUTE = 32, // op_execute can be deferred + PAST_EOF = 64, // EOF was returned by fetch from this statement + BOF_SET = 128, // Beginning-of-stream + PAST_BOF = 256 // BOF was returned by fetch from this statement }; + static const auto STREAM_END = (BOF_SET | EOF_SET); + static const auto PAST_END = (PAST_BOF | PAST_EOF); + public: Rsr() : rsr_next(0), rsr_rdb(0), rsr_rtr(0), rsr_iface(NULL), rsr_cursor(NULL), rsr_batch(NULL), @@ -544,7 +551,8 @@ struct Rsr : public Firebird::GlobalStorage, public TypedHandle rsr_format(0), rsr_message(0), rsr_buffer(0), rsr_status(0), rsr_id(0), rsr_fmt_length(0), rsr_rows_pending(0), rsr_msgs_waiting(0), rsr_reorder_level(0), rsr_batch_count(0), - rsr_cursor_name(getPool()), rsr_delayed_format(false), rsr_timeout(0), rsr_self(NULL) + rsr_cursor_name(getPool()), rsr_delayed_format(false), rsr_timeout(0), rsr_self(NULL), + rsr_fetch_operation(fetch_next), rsr_fetch_position(0) { } ~Rsr() @@ -1336,7 +1344,7 @@ struct rem_port : public Firebird::GlobalStorage, public Firebird::RefCounted ISC_STATUS end_transaction(P_OP, P_RLSE*, PACKET*); ISC_STATUS execute_immediate(P_OP, P_SQLST*, PACKET*); ISC_STATUS execute_statement(P_OP, P_SQLDATA*, PACKET*); - ISC_STATUS fetch(P_SQLDATA*, PACKET*); + ISC_STATUS fetch(P_SQLDATA*, PACKET*, bool); ISC_STATUS get_segment(P_SGMT*, PACKET*); ISC_STATUS get_slice(P_SLC*, PACKET*); void info(P_OP, P_INFO*, PACKET*); diff --git a/src/remote/server/server.cpp b/src/remote/server/server.cpp index 2052986b287..8876a4ec452 100644 --- a/src/remote/server/server.cpp +++ b/src/remote/server/server.cpp @@ -3858,11 +3858,14 @@ ISC_STATUS rem_port::execute_statement(P_OP op, P_SQLDATA* sqldata, PACKET* send if ((flags & IStatement::FLAG_HAS_CURSOR) && (out_msg_length == 0)) { + const auto cursorFlags = (port_protocol >= PROTOCOL_FETCH_SCROLL) ? + sqldata->p_sqldata_cursor_flags : 0; + statement->rsr_cursor = statement->rsr_iface->openCursor(&status_vector, tra, iMsgBuffer.metadata, iMsgBuffer.buffer, (out_blr_length ? oMsgBuffer.metadata : DELAYED_OUT_FORMAT), - 0); + cursorFlags); if (!(status_vector.getState() & Firebird::IStatus::STATE_ERRORS)) { transaction->rtr_cursors.add(statement); @@ -3909,7 +3912,7 @@ ISC_STATUS rem_port::execute_statement(P_OP op, P_SQLDATA* sqldata, PACKET* send } -ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL) +ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL, bool scroll) { /***************************************** * @@ -3926,16 +3929,29 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL) Rsr* statement; getHandle(statement, sqldata->p_sqldata_statement); - // On first fetch, clear the end-of-stream flag & reset the message buffers + // The default (and legacy) scrolling option is FETCH NEXT + const auto operation = scroll ? sqldata->p_sqldata_fetch_op : fetch_next; + const auto position = scroll ? sqldata->p_sqldata_fetch_pos : 0; + + // Whether we're fetching in the forward direction + const bool forward = + (operation == fetch_next || operation == fetch_last || + ((operation == fetch_absolute || operation == fetch_relative) && position > 0)); + + // Whether we're fetching relatively to the current position + const bool relative = + (operation == fetch_next || operation == fetch_prior || operation == fetch_relative); if (!statement->rsr_flags.test(Rsr::FETCHED)) { - statement->rsr_flags.clear(Rsr::EOF_SET | Rsr::STREAM_ERR); + // On first fetch, clear the end-of-stream flag & reset the message buffers + + statement->rsr_flags.clear(Rsr::STREAM_END | Rsr::STREAM_ERR); statement->clearException(); RMessage* message = statement->rsr_message; - if (message != NULL) + if (message) { statement->rsr_buffer = message; @@ -3949,12 +3965,18 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL) } } } + else if (!relative) + { + // Clear the end-of-stream flag if the fetch is positioned absolutely + statement->rsr_flags.clear(Rsr::STREAM_END); + } const ULONG msg_length = statement->rsr_format ? statement->rsr_format->fmt_length : 0; // If required, call setDelayedOutputFormat() statement->checkCursor(); + if (statement->rsr_delayed_format) { InternalMessageBuffer msgBuffer(sqldata->p_sqldata_blr.cstr_length, @@ -3969,10 +3991,15 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL) statement->rsr_delayed_format = false; } - // Get ready to ship the data out + // Setup the prefetch count. It's available only for FETCH NEXT and FETCH PRIOR + // operations, unless batching is disabled explicitly. + + const bool prefetch = (operation == fetch_next || operation == fetch_prior) && + !statement->rsr_flags.test(Rsr::NO_BATCH); - const USHORT max_records = statement->rsr_flags.test(Rsr::NO_BATCH) ? - 1 : sqldata->p_sqldata_messages; + const USHORT max_records = prefetch ? sqldata->p_sqldata_messages : 1; + + // Get ready to ship the data out P_SQLDATA* response = &sendL->p_sqldata; sendL->p_operation = op_fetch_response; @@ -3986,31 +4013,55 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL) const FB_UINT64 org_packets = this->port_snd_packets; USHORT count = 0; - bool rc = true; + bool success = true; + int rc = 0; for (; count < max_records; count++) { - // Have we exhausted the cache & reached cursor EOF? - if (statement->rsr_flags.test(Rsr::EOF_SET) && !statement->rsr_msgs_waiting) - { - statement->rsr_flags.clear(Rsr::EOF_SET); - rc = false; - break; - } + // If we have exhausted the cache... - // Have we exhausted the cache & have a pending error? - if (statement->rsr_flags.test(Rsr::STREAM_ERR) && !statement->rsr_msgs_waiting) + if (!statement->rsr_msgs_waiting) { - fb_assert(statement->rsr_status); - statement->rsr_flags.clear(Rsr::STREAM_ERR); - return this->send_response(sendL, 0, 0, statement->rsr_status->value(), false); + // ... have we reached end of the cursor? + + if (relative) + { + if (forward) + { + if (statement->rsr_flags.test(Rsr::EOF_SET)) + { + statement->rsr_flags.clear(Rsr::EOF_SET); + success = false; + } + } + else + { + if (statement->rsr_flags.test(Rsr::BOF_SET)) + { + statement->rsr_flags.clear(Rsr::BOF_SET); + success = false; + } + } + } + + if (!success) + break; + + // ... do we have a pending error? + + if (statement->rsr_flags.test(Rsr::STREAM_ERR)) + { + fb_assert(statement->rsr_status); + statement->rsr_flags.clear(Rsr::STREAM_ERR); + return this->send_response(sendL, 0, 0, statement->rsr_status->value(), false); + } } message = statement->rsr_buffer; - // Make sure message can be de referenced, if not then return false + // Make sure message can be dereferenced, if not then return false if (message == NULL) - return FALSE; + return FB_FAILURE; // If we don't have a message cached, get one from the access method. @@ -4018,15 +4069,46 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL) { fb_assert(statement->rsr_msgs_waiting == 0); - rc = statement->rsr_cursor->fetchNext( - &status_vector, message->msg_buffer) == IStatus::RESULT_OK; + switch (operation) + { + case fetch_next: + rc = statement->rsr_cursor->fetchNext(&status_vector, message->msg_buffer); + break; + + case fetch_prior: + rc = statement->rsr_cursor->fetchPrior(&status_vector, message->msg_buffer); + break; + + case fetch_first: + rc = statement->rsr_cursor->fetchFirst(&status_vector, message->msg_buffer); + break; + + case fetch_last: + rc = statement->rsr_cursor->fetchLast(&status_vector, message->msg_buffer); + break; + + case fetch_absolute: + rc = statement->rsr_cursor->fetchAbsolute(&status_vector, position, + message->msg_buffer); + break; + + case fetch_relative: + rc = statement->rsr_cursor->fetchRelative(&status_vector, position, + message->msg_buffer); + break; + + default: + fb_assert(false); + } statement->rsr_flags.set(Rsr::FETCHED); if (status_vector.getState() & Firebird::IStatus::STATE_ERRORS) return this->send_response(sendL, 0, 0, &status_vector, false); - if (!rc) + success = (rc == IStatus::RESULT_OK); + + if (!success) break; message->msg_address = message->msg_buffer; @@ -4040,8 +4122,7 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL) // There's a buffer waiting -- send it - if (!this->send_partial(sendL)) - return FALSE; + this->send_partial(sendL); message->msg_address = NULL; @@ -4053,7 +4134,7 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL) break; } - response->p_sqldata_status = rc ? 0 : 100; + response->p_sqldata_status = success ? 0 : 100; response->p_sqldata_messages = 0; // hvlad: message->msg_address not used in xdr_protocol because of @@ -4079,7 +4160,7 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL) while (message->msg_address && message->msg_next != statement->rsr_buffer) message = message->msg_next; - USHORT prefetch_count = (rc && !statement->rsr_flags.test(Rsr::NO_BATCH)) ? count : 0; + USHORT prefetch_count = (success && prefetch) ? count : 0; for (; prefetch_count; --prefetch_count) { @@ -4098,8 +4179,21 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL) next = message; } - rc = statement->rsr_cursor->fetchNext( - &status_vector, message->msg_buffer) == IStatus::RESULT_OK; + // Only FETCH NEXT and FETCH PRIOR operations are available for prefetch + + switch (operation) + { + case fetch_next: + rc = statement->rsr_cursor->fetchNext(&status_vector, message->msg_buffer); + break; + + case fetch_prior: + rc = statement->rsr_cursor->fetchPrior(&status_vector, message->msg_buffer); + break; + + default: + fb_assert(false); + } if (status_vector.getState() & Firebird::IStatus::STATE_ERRORS) { @@ -4109,11 +4203,18 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL) statement->rsr_flags.set(Rsr::STREAM_ERR); statement->saveException(&status_vector, true); } + break; } - if (!rc) + + if (rc == IStatus::RESULT_NO_DATA) { - statement->rsr_flags.set(Rsr::EOF_SET); + if (statement->rsr_cursor->isBof(&status_vector)) + statement->rsr_flags.set(Rsr::BOF_SET); + + if (statement->rsr_cursor->isEof(&status_vector)) + statement->rsr_flags.set(Rsr::EOF_SET); + break; } @@ -4122,7 +4223,7 @@ ISC_STATUS rem_port::fetch(P_SQLDATA * sqldata, PACKET* sendL) statement->rsr_msgs_waiting++; } - return TRUE; + return FB_SUCCESS; } @@ -4967,7 +5068,8 @@ static bool process_packet(rem_port* port, PACKET* sendL, PACKET* receive, rem_p break; case op_fetch: - port->fetch(&receive->p_sqldata, sendL); + case op_fetch_scroll: + port->fetch(&receive->p_sqldata, sendL, op == op_fetch_scroll); break; case op_free_statement: diff --git a/src/yvalve/why.cpp b/src/yvalve/why.cpp index 92f3f7965bd..e816b6b29a0 100644 --- a/src/yvalve/why.cpp +++ b/src/yvalve/why.cpp @@ -4149,7 +4149,7 @@ int YBlob::getSegment(CheckStatusWrapper* status, unsigned int bufferLength, e.stuffException(status); } - return 0; + return IStatus::RESULT_ERROR; } void YBlob::putSegment(CheckStatusWrapper* status, unsigned int length, const void* buffer) @@ -4750,7 +4750,7 @@ int YResultSet::fetchNext(CheckStatusWrapper* status, void* buffer) e.stuffException(status); } - return FB_FALSE; + return IStatus::RESULT_ERROR; } int YResultSet::fetchPrior(CheckStatusWrapper* status, void* buffer) @@ -4766,7 +4766,7 @@ int YResultSet::fetchPrior(CheckStatusWrapper* status, void* buffer) e.stuffException(status); } - return FB_FALSE; + return IStatus::RESULT_ERROR; } int YResultSet::fetchFirst(CheckStatusWrapper* status, void* buffer) @@ -4782,7 +4782,7 @@ int YResultSet::fetchFirst(CheckStatusWrapper* status, void* buffer) e.stuffException(status); } - return FB_FALSE; + return IStatus::RESULT_ERROR; } int YResultSet::fetchLast(CheckStatusWrapper* status, void* buffer) @@ -4798,7 +4798,7 @@ int YResultSet::fetchLast(CheckStatusWrapper* status, void* buffer) e.stuffException(status); } - return FB_FALSE; + return IStatus::RESULT_ERROR; } int YResultSet::fetchAbsolute(CheckStatusWrapper* status, int position, void* buffer) @@ -4814,7 +4814,7 @@ int YResultSet::fetchAbsolute(CheckStatusWrapper* status, int position, void* bu e.stuffException(status); } - return FB_FALSE; + return IStatus::RESULT_ERROR; } int YResultSet::fetchRelative(CheckStatusWrapper* status, int offset, void* buffer) @@ -4830,7 +4830,7 @@ int YResultSet::fetchRelative(CheckStatusWrapper* status, int offset, void* buff e.stuffException(status); } - return FB_FALSE; + return IStatus::RESULT_ERROR; } FB_BOOLEAN YResultSet::isEof(CheckStatusWrapper* status)