From 051122bb15c2a5f02c35c35530b55523831d0e17 Mon Sep 17 00:00:00 2001 From: abingcbc Date: Wed, 19 Jun 2024 14:25:09 +0800 Subject: [PATCH] fix timeout force read when still can read --- core/event/BlockEventManager.cpp | 8 +-- core/reader/LogFileReader.cpp | 54 +++++++++++-------- core/reader/LogFileReader.h | 6 +-- .../unittest/reader/LogFileReaderUnittest.cpp | 12 +++-- 4 files changed, 48 insertions(+), 32 deletions(-) diff --git a/core/event/BlockEventManager.cpp b/core/event/BlockEventManager.cpp index 3068c7279b..ce43bef41a 100644 --- a/core/event/BlockEventManager.cpp +++ b/core/event/BlockEventManager.cpp @@ -13,10 +13,11 @@ // limitations under the License. #include "BlockEventManager.h" -#include "processor/daemon/LogProcess.h" + #include "common/HashUtil.h" #include "common/StringTools.h" #include "polling/PollingEventQueue.h" +#include "processor/daemon/LogProcess.h" DEFINE_FLAG_INT32(max_block_event_timeout, "max block event timeout, seconds", 3); @@ -48,8 +49,9 @@ void BlockedEventManager::UpdateBlockEvent(const LogstoreFeedBackKey& logstoreKe .append(pEvent->GetConfigName()); hashKey = HashSignatureString(key.c_str(), key.size()); } - // LOG_DEBUG(sLogger, ("Add block event ", pEvent->GetSource())(pEvent->GetObject(), - // pEvent->GetInode())(pEvent->GetConfigName(), hashKey)); + LOG_DEBUG(sLogger, + ("Add block event ", pEvent->GetSource())(pEvent->GetObject(), + pEvent->GetInode())(pEvent->GetConfigName(), hashKey)); ScopedSpinLock lock(mLock); mBlockEventMap[hashKey].Update(logstoreKey, pEvent, curTime); } diff --git a/core/reader/LogFileReader.cpp b/core/reader/LogFileReader.cpp index e18ba332f1..42bec5c319 100644 --- a/core/reader/LogFileReader.cpp +++ b/core/reader/LogFileReader.cpp @@ -1001,28 +1001,26 @@ bool LogFileReader::ReadLog(LogBuffer& logBuffer, const Event* event) { } size_t lastFilePos = mLastFilePos; - bool allowRollback = true; + bool tryRollback = true; if (event != nullptr && event->IsReaderFlushTimeout()) { // If flush timeout event, we should filter whether the event is legacy. if (event->GetLastReadPos() == GetLastReadPos() && event->GetLastFilePos() == mLastFilePos && event->GetInode() == mDevInode.inode) { - allowRollback = false; + // For the scenario: log rotation, the last line needs to be read by timeout, which is a normal situation. + // So here only local warning is given, don't raise alarm. + LOG_WARNING(sLogger, + ("read log", "timeout")("project", GetProject())("logstore", GetLogstore())( + "config", GetConfigName())("log reader queue name", mHostLogPath)("log path", mRealLogPath)( + "file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( + "file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)( + "last file position", mLastFilePos)("last file size", mLastFileSize)( + "read size", mLastFilePos - lastFilePos)("log", logBuffer.rawBuffer)); + tryRollback = false; } else { return false; } } - bool moreData = GetRawData(logBuffer, mLastFileSize, allowRollback); - if (!allowRollback) { - // For the scenario: log rotation, the last line needs to be read by timeout, which is a normal situation. - // So here only local warning is given, don't raise alarm. - LOG_WARNING(sLogger, - ("read timeout", "force read")("project", GetProject())("logstore", GetLogstore())( - "config", GetConfigName())("log reader queue name", mHostLogPath)("log path", mRealLogPath)( - "file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( - "file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)( - "last file position", mLastFilePos)("last file size", mLastFileSize)( - "read size", mLastFilePos - lastFilePos)("log", logBuffer.rawBuffer)); - } + bool moreData = GetRawData(logBuffer, mLastFileSize, tryRollback); if (!logBuffer.rawBuffer.empty() > 0) { if (mEOOption) { // This read was replayed by checkpoint, adjust mLastFilePos to skip hole. @@ -1035,7 +1033,8 @@ bool LogFileReader::ReadLog(LogBuffer& logBuffer, const Event* event) { LOG_DEBUG(sLogger, ("read log file", mRealLogPath)("last file pos", mLastFilePos)("last file size", mLastFileSize)( "read size", mLastFilePos - lastFilePos)); - if (HasDataInCache()) { + if (HasDataInCache() && GetLastReadPos() == mLastFileSize) { + LOG_DEBUG(sLogger, ("add timeout event", mRealLogPath)); auto event = CreateFlushTimeoutEvent(); BlockedEventManager::GetInstance()->UpdateBlockEvent( GetLogstoreKey(), GetConfigName(), *event, mDevInode, time(NULL) + mReaderConfig.first->mFlushTimeoutSecs); @@ -1528,7 +1527,7 @@ bool LogFileReader::GetLogTimeByOffset(const char* buffer, * "SingleLineLog_1\nSingleLineLog_2\nSingleLineLog_3\n" -> "SingleLineLog_1\nSingleLineLog_2\nSingleLineLog_3\0" * "SingleLineLog_1\nSingleLineLog_2\nxxx" -> "SingleLineLog_1\nSingleLineLog_2\0" */ -bool LogFileReader::GetRawData(LogBuffer& logBuffer, int64_t fileSize, bool allowRollback) { +bool LogFileReader::GetRawData(LogBuffer& logBuffer, int64_t fileSize, bool tryRollback) { // Truncate, return false to indicate no more data. if (fileSize == mLastFilePos) { return false; @@ -1536,9 +1535,9 @@ bool LogFileReader::GetRawData(LogBuffer& logBuffer, int64_t fileSize, bool allo bool moreData = false; if (mReaderConfig.first->mFileEncoding == FileReaderOptions::Encoding::GBK) - ReadGBK(logBuffer, fileSize, moreData, allowRollback); + ReadGBK(logBuffer, fileSize, moreData, tryRollback); else - ReadUTF8(logBuffer, fileSize, moreData, allowRollback); + ReadUTF8(logBuffer, fileSize, moreData, tryRollback); int64_t delta = fileSize - mLastFilePos; if (delta > mReaderConfig.first->mReadDelayAlertThresholdBytes && !logBuffer.rawBuffer.empty()) { @@ -1643,7 +1642,7 @@ void LogFileReader::setExactlyOnceCheckpointAfterRead(size_t readSize) { cpt.set_read_length(readSize); } -void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, bool allowRollback) { +void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback) { char* stringBuffer = nullptr; size_t nbytes = 0; @@ -1661,7 +1660,7 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, logBuffer.readOffset = mLastFilePos; --nbytes; } - mLastForceRead = !allowRollback; + mLastForceRead = true; mCache.clear(); moreData = false; } else { @@ -1685,6 +1684,11 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, ? ReadFile(mLogFileOp, stringMemory.data + lastCacheSize, READ_BYTE, lastReadPos, &truncateInfo) : 0UL; stringBuffer = stringMemory.data; + bool allowRollback = true; + // Only when there is no new log and not try rollback, then force read + if (!tryRollback && nbytes == 0) { + allowRollback = false; + } if (nbytes == 0 && (!lastCacheSize || allowRollback)) { // read nothing, if no cached data or allow rollback the // reader's state cannot be changed return; @@ -1767,12 +1771,13 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, LOG_DEBUG(sLogger, ("read size", nbytes)("last file pos", mLastFilePos)); } -void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, bool allowRollback) { +void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback) { std::unique_ptr gbkMemory; char* gbkBuffer = nullptr; size_t readCharCount = 0, originReadCount = 0; int64_t lastReadPos = 0; bool logTooLongSplitFlag = false, fromCpt = false; + bool allowRollback = true; logBuffer.readOffset = mLastFilePos; if (!mLogFileOp.IsOpen()) { @@ -1788,7 +1793,8 @@ void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, b logBuffer.readOffset = mLastFilePos; --readCharCount; } - mLastForceRead = !allowRollback; + mLastForceRead = true; + allowRollback = false; lastReadPos = mLastFilePos + readCharCount; originReadCount = readCharCount; moreData = false; @@ -1807,6 +1813,10 @@ void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, b lastReadPos = GetLastReadPos(); readCharCount = READ_BYTE ? ReadFile(mLogFileOp, gbkBuffer + lastCacheSize, READ_BYTE, lastReadPos, &truncateInfo) : 0UL; + // Only when there is no new log and not try rollback, then force read + if (!tryRollback && readCharCount == 0) { + allowRollback = false; + } if (readCharCount == 0 && (!lastCacheSize || allowRollback)) { // just keep last cache return; } diff --git a/core/reader/LogFileReader.h b/core/reader/LogFileReader.h index 2eda081a28..dcca5c255d 100644 --- a/core/reader/LogFileReader.h +++ b/core/reader/LogFileReader.h @@ -355,9 +355,9 @@ class LogFileReader { int64_t GetLogGroupKey() const { return mLogGroupKey; } protected: - bool GetRawData(LogBuffer& logBuffer, int64_t fileSize, bool allowRollback = true); - void ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, bool allowRollback = true); - void ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, bool allowRollback = true); + bool GetRawData(LogBuffer& logBuffer, int64_t fileSize, bool tryRollback = true); + void ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback = true); + void ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback = true); size_t ReadFile(LogFileOperator& logFileOp, void* buf, size_t size, int64_t& offset, TruncateInfo** truncateInfo = NULL); diff --git a/core/unittest/reader/LogFileReaderUnittest.cpp b/core/unittest/reader/LogFileReaderUnittest.cpp index a37df6956e..5c0947f305 100644 --- a/core/unittest/reader/LogFileReaderUnittest.cpp +++ b/core/unittest/reader/LogFileReaderUnittest.cpp @@ -260,9 +260,11 @@ void LogFileReaderUnittest::TestReadGBK() { expectedPart.resize(firstReadSize); reader.ReadGBK(logBuffer, 127, moreData, false); // first line without \n APSARA_TEST_FALSE_FATAL(moreData); - APSARA_TEST_STREQ_FATAL(expectedPart.c_str(), logBuffer.rawBuffer.data()); - APSARA_TEST_EQUAL_FATAL(reader.mCache.size(), 0UL); + APSARA_TEST_FALSE_FATAL(reader.mLastForceRead); + reader.ReadGBK(logBuffer, 127, moreData, false); // force read, clear cache APSARA_TEST_TRUE_FATAL(reader.mLastForceRead); + APSARA_TEST_EQUAL_FATAL(reader.mCache.size(), 0UL); + APSARA_TEST_STREQ_FATAL(expectedPart.c_str(), logBuffer.rawBuffer.data()); // second read, start with \n but with other lines reader.ReadGBK(logBuffer, fileSize - 1, moreData); @@ -455,9 +457,11 @@ void LogFileReaderUnittest::TestReadUTF8() { reader.mLastForceRead = true; reader.ReadUTF8(logBuffer, firstReadSize, moreData, false); APSARA_TEST_FALSE_FATAL(moreData); - APSARA_TEST_STREQ_FATAL(expectedPart.c_str(), logBuffer.rawBuffer.data()); - APSARA_TEST_EQUAL_FATAL(reader.mCache.size(), 0UL); + APSARA_TEST_FALSE_FATAL(reader.mLastForceRead); + reader.ReadUTF8(logBuffer, firstReadSize, moreData, false); // force read, clear cache APSARA_TEST_TRUE_FATAL(reader.mLastForceRead); + APSARA_TEST_EQUAL_FATAL(reader.mCache.size(), 0UL); + APSARA_TEST_STREQ_FATAL(expectedPart.c_str(), logBuffer.rawBuffer.data()); // second read, start with \n but with other lines reader.ReadUTF8(logBuffer, fileSize - 1, moreData);