Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix timeout force read when still can read #1551

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/event/BlockEventManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// limitations under the License.

#include "BlockEventManager.h"
#include "processor/daemon/LogProcess.h"

#include "common/HashUtil.h"
#include "common/StringTools.h"
#include "polling/PollingEventQueue.h"
#include "processor/daemon/LogProcess.h"

DEFINE_FLAG_INT32(max_block_event_timeout, "max block event timeout, seconds", 3);

Expand Down Expand Up @@ -48,8 +49,9 @@ void BlockedEventManager::UpdateBlockEvent(const LogstoreFeedBackKey& logstoreKe
.append(pEvent->GetConfigName());
hashKey = HashSignatureString(key.c_str(), key.size());
}
// LOG_DEBUG(sLogger, ("Add block event ", pEvent->GetSource())(pEvent->GetObject(),
// pEvent->GetInode())(pEvent->GetConfigName(), hashKey));
LOG_DEBUG(sLogger,
("Add block event ", pEvent->GetSource())(pEvent->GetObject(),
pEvent->GetInode())(pEvent->GetConfigName(), hashKey));
ScopedSpinLock lock(mLock);
mBlockEventMap[hashKey].Update(logstoreKey, pEvent, curTime);
}
Expand Down
54 changes: 32 additions & 22 deletions core/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1001,28 +1001,26 @@ bool LogFileReader::ReadLog(LogBuffer& logBuffer, const Event* event) {
}

size_t lastFilePos = mLastFilePos;
bool allowRollback = true;
bool tryRollback = true;
if (event != nullptr && event->IsReaderFlushTimeout()) {
// If flush timeout event, we should filter whether the event is legacy.
if (event->GetLastReadPos() == GetLastReadPos() && event->GetLastFilePos() == mLastFilePos
&& event->GetInode() == mDevInode.inode) {
allowRollback = false;
// For the scenario: log rotation, the last line needs to be read by timeout, which is a normal situation.
// So here only local warning is given, don't raise alarm.
LOG_WARNING(sLogger,
("read log", "timeout")("project", GetProject())("logstore", GetLogstore())(
"config", GetConfigName())("log reader queue name", mHostLogPath)("log path", mRealLogPath)(
"file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))(
"file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)(
"last file position", mLastFilePos)("last file size", mLastFileSize)(
"read size", mLastFilePos - lastFilePos)("log", logBuffer.rawBuffer));
tryRollback = false;
} else {
return false;
}
}
bool moreData = GetRawData(logBuffer, mLastFileSize, allowRollback);
if (!allowRollback) {
// For the scenario: log rotation, the last line needs to be read by timeout, which is a normal situation.
// So here only local warning is given, don't raise alarm.
LOG_WARNING(sLogger,
("read timeout", "force read")("project", GetProject())("logstore", GetLogstore())(
"config", GetConfigName())("log reader queue name", mHostLogPath)("log path", mRealLogPath)(
"file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))(
"file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)(
"last file position", mLastFilePos)("last file size", mLastFileSize)(
"read size", mLastFilePos - lastFilePos)("log", logBuffer.rawBuffer));
}
bool moreData = GetRawData(logBuffer, mLastFileSize, tryRollback);
if (!logBuffer.rawBuffer.empty() > 0) {
if (mEOOption) {
// This read was replayed by checkpoint, adjust mLastFilePos to skip hole.
Expand All @@ -1035,7 +1033,8 @@ bool LogFileReader::ReadLog(LogBuffer& logBuffer, const Event* event) {
LOG_DEBUG(sLogger,
("read log file", mRealLogPath)("last file pos", mLastFilePos)("last file size", mLastFileSize)(
"read size", mLastFilePos - lastFilePos));
if (HasDataInCache()) {
if (HasDataInCache() && GetLastReadPos() == mLastFileSize) {
LOG_DEBUG(sLogger, ("add timeout event", mRealLogPath));
auto event = CreateFlushTimeoutEvent();
BlockedEventManager::GetInstance()->UpdateBlockEvent(
GetLogstoreKey(), GetConfigName(), *event, mDevInode, time(NULL) + mReaderConfig.first->mFlushTimeoutSecs);
Expand Down Expand Up @@ -1528,17 +1527,17 @@ bool LogFileReader::GetLogTimeByOffset(const char* buffer,
* "SingleLineLog_1\nSingleLineLog_2\nSingleLineLog_3\n" -> "SingleLineLog_1\nSingleLineLog_2\nSingleLineLog_3\0"
* "SingleLineLog_1\nSingleLineLog_2\nxxx" -> "SingleLineLog_1\nSingleLineLog_2\0"
*/
bool LogFileReader::GetRawData(LogBuffer& logBuffer, int64_t fileSize, bool allowRollback) {
bool LogFileReader::GetRawData(LogBuffer& logBuffer, int64_t fileSize, bool tryRollback) {
// Truncate, return false to indicate no more data.
if (fileSize == mLastFilePos) {
return false;
}

bool moreData = false;
if (mReaderConfig.first->mFileEncoding == FileReaderOptions::Encoding::GBK)
ReadGBK(logBuffer, fileSize, moreData, allowRollback);
ReadGBK(logBuffer, fileSize, moreData, tryRollback);
else
ReadUTF8(logBuffer, fileSize, moreData, allowRollback);
ReadUTF8(logBuffer, fileSize, moreData, tryRollback);

int64_t delta = fileSize - mLastFilePos;
if (delta > mReaderConfig.first->mReadDelayAlertThresholdBytes && !logBuffer.rawBuffer.empty()) {
Expand Down Expand Up @@ -1643,7 +1642,7 @@ void LogFileReader::setExactlyOnceCheckpointAfterRead(size_t readSize) {
cpt.set_read_length(readSize);
}

void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, bool allowRollback) {
void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback) {
char* stringBuffer = nullptr;
size_t nbytes = 0;

Expand All @@ -1661,7 +1660,7 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData,
logBuffer.readOffset = mLastFilePos;
--nbytes;
}
mLastForceRead = !allowRollback;
mLastForceRead = true;
mCache.clear();
moreData = false;
} else {
Expand All @@ -1685,6 +1684,11 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData,
? ReadFile(mLogFileOp, stringMemory.data + lastCacheSize, READ_BYTE, lastReadPos, &truncateInfo)
: 0UL;
stringBuffer = stringMemory.data;
bool allowRollback = true;
// Only when there is no new log and not try rollback, then force read
if (!tryRollback && nbytes == 0) {
allowRollback = false;
}
if (nbytes == 0 && (!lastCacheSize || allowRollback)) { // read nothing, if no cached data or allow rollback the
// reader's state cannot be changed
return;
Expand Down Expand Up @@ -1767,12 +1771,13 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData,
LOG_DEBUG(sLogger, ("read size", nbytes)("last file pos", mLastFilePos));
}

void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, bool allowRollback) {
void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback) {
std::unique_ptr<char[]> gbkMemory;
char* gbkBuffer = nullptr;
size_t readCharCount = 0, originReadCount = 0;
int64_t lastReadPos = 0;
bool logTooLongSplitFlag = false, fromCpt = false;
bool allowRollback = true;

logBuffer.readOffset = mLastFilePos;
if (!mLogFileOp.IsOpen()) {
Expand All @@ -1788,7 +1793,8 @@ void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, b
logBuffer.readOffset = mLastFilePos;
--readCharCount;
}
mLastForceRead = !allowRollback;
mLastForceRead = true;
allowRollback = false;
lastReadPos = mLastFilePos + readCharCount;
originReadCount = readCharCount;
moreData = false;
Expand All @@ -1807,6 +1813,10 @@ void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, b
lastReadPos = GetLastReadPos();
readCharCount
= READ_BYTE ? ReadFile(mLogFileOp, gbkBuffer + lastCacheSize, READ_BYTE, lastReadPos, &truncateInfo) : 0UL;
// Only when there is no new log and not try rollback, then force read
if (!tryRollback && readCharCount == 0) {
allowRollback = false;
}
if (readCharCount == 0 && (!lastCacheSize || allowRollback)) { // just keep last cache
return;
}
Expand Down
6 changes: 3 additions & 3 deletions core/reader/LogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,9 @@ class LogFileReader {
int64_t GetLogGroupKey() const { return mLogGroupKey; }

protected:
bool GetRawData(LogBuffer& logBuffer, int64_t fileSize, bool allowRollback = true);
void ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, bool allowRollback = true);
void ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, bool allowRollback = true);
bool GetRawData(LogBuffer& logBuffer, int64_t fileSize, bool tryRollback = true);
void ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback = true);
void ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback = true);

size_t
ReadFile(LogFileOperator& logFileOp, void* buf, size_t size, int64_t& offset, TruncateInfo** truncateInfo = NULL);
Expand Down
12 changes: 8 additions & 4 deletions core/unittest/reader/LogFileReaderUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,11 @@ void LogFileReaderUnittest::TestReadGBK() {
expectedPart.resize(firstReadSize);
reader.ReadGBK(logBuffer, 127, moreData, false); // first line without \n
APSARA_TEST_FALSE_FATAL(moreData);
APSARA_TEST_STREQ_FATAL(expectedPart.c_str(), logBuffer.rawBuffer.data());
APSARA_TEST_EQUAL_FATAL(reader.mCache.size(), 0UL);
APSARA_TEST_FALSE_FATAL(reader.mLastForceRead);
reader.ReadGBK(logBuffer, 127, moreData, false); // force read, clear cache
APSARA_TEST_TRUE_FATAL(reader.mLastForceRead);
APSARA_TEST_EQUAL_FATAL(reader.mCache.size(), 0UL);
APSARA_TEST_STREQ_FATAL(expectedPart.c_str(), logBuffer.rawBuffer.data());

// second read, start with \n but with other lines
reader.ReadGBK(logBuffer, fileSize - 1, moreData);
Expand Down Expand Up @@ -455,9 +457,11 @@ void LogFileReaderUnittest::TestReadUTF8() {
reader.mLastForceRead = true;
reader.ReadUTF8(logBuffer, firstReadSize, moreData, false);
APSARA_TEST_FALSE_FATAL(moreData);
APSARA_TEST_STREQ_FATAL(expectedPart.c_str(), logBuffer.rawBuffer.data());
APSARA_TEST_EQUAL_FATAL(reader.mCache.size(), 0UL);
APSARA_TEST_FALSE_FATAL(reader.mLastForceRead);
reader.ReadUTF8(logBuffer, firstReadSize, moreData, false); // force read, clear cache
APSARA_TEST_TRUE_FATAL(reader.mLastForceRead);
APSARA_TEST_EQUAL_FATAL(reader.mCache.size(), 0UL);
APSARA_TEST_STREQ_FATAL(expectedPart.c_str(), logBuffer.rawBuffer.data());

// second read, start with \n but with other lines
reader.ReadUTF8(logBuffer, fileSize - 1, moreData);
Expand Down
Loading