From b2f091ce50a49f3d5f1eb3e360d515610e2cf63c Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 10 Nov 2023 15:38:49 +0800 Subject: [PATCH] fix duplication problem arised from #1216 (#1232) --- core/reader/LogFileReader.cpp | 118 ++++++++++++++++++---------------- 1 file changed, 64 insertions(+), 54 deletions(-) diff --git a/core/reader/LogFileReader.cpp b/core/reader/LogFileReader.cpp index 4463c29db4..d66def7824 100644 --- a/core/reader/LogFileReader.cpp +++ b/core/reader/LogFileReader.cpp @@ -80,7 +80,7 @@ void LogFileReader::DumpMetaToMem(bool checkConfigFlag) { ("skip dump reader meta", "invalid log reader queue name")("project", mProjectName)( "logstore", mCategory)("config", mConfigName)("log reader queue name", mHostLogPath)( "file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( - "file signature", mLastFileSignatureHash)); + "file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)); return; } string dirPath = mHostLogPath.substr(0, index); @@ -90,15 +90,16 @@ void LogFileReader::DumpMetaToMem(bool checkConfigFlag) { ("skip dump reader meta", "no config matches the file path")("project", mProjectName)( "logstore", mCategory)("config", mConfigName)("log reader queue name", mHostLogPath)( "file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( - "file signature", mLastFileSignatureHash)); + "file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)); return; } LOG_INFO(sLogger, ("dump log reader meta, project", mProjectName)("logstore", mCategory)("config", mConfigName)( "log reader queue name", mHostLogPath)("file device", ToString(mDevInode.dev))( "file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)( - "real file path", mRealLogPath)("file size", mLastFileSize)("last file position", mLastFilePos)( - "is file opened", ToString(mLogFileOp.IsOpen()))); + "file signature size", mLastFileSignatureSize)("real file path", mRealLogPath)( + "file size", mLastFileSize)("last file position", mLastFilePos)("is file opened", + ToString(mLogFileOp.IsOpen()))); } CheckPoint* checkPointPtr = new CheckPoint(mHostLogPath, mLastFilePos, @@ -161,8 +162,8 @@ void LogFileReader::InitReader(bool tailExisted, FileReadPolicy policy, uint32_t ("recover log reader status from checkpoint, project", mProjectName)("logstore", mCategory)( "config", mConfigName)("log reader queue name", mHostLogPath)( "file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( - "file signature", mLastFileSignatureHash)("real file path", mRealLogPath)( - "file size", mLastFileSize)("last file position", mLastFilePos)); + "file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)( + "real file path", mRealLogPath)("last file position", mLastFilePos)); // if file is open or // last update time is new and the file's container is not stopped we // we should use first modify @@ -539,8 +540,8 @@ void LogFileReader::SetReadFromBeginning() { LOG_INFO(sLogger, ("force reading file from the beginning, project", mProjectName)("logstore", mCategory)( "config", mConfigName)("log reader queue name", mHostLogPath)("file device", ToString(mDevInode.dev))( - "file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)("file size", - mLastFileSize)); + "file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)( + "file signature size", mLastFileSignatureSize)("file size", mLastFileSize)); mFirstWatched = false; } @@ -742,7 +743,8 @@ bool LogFileReader::CheckForFirstOpen(FileReadPolicy policy) { "open file failed when trying to find the start position for reading")("project", mProjectName)( "logstore", mCategory)("config", mConfigName)("log reader queue name", mHostLogPath)( "file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( - "file signature", mLastFileSignatureHash)("file size", mLastFileSize)); + "file signature", mLastFileSignatureHash)("file signature size", + mLastFileSignatureSize)("file size", mLastFileSize)); auto error = GetErrno(); if (fsutil::Dir::IsENOENT(error)) return true; @@ -776,8 +778,8 @@ bool LogFileReader::CheckForFirstOpen(FileReadPolicy policy) { LOG_INFO(sLogger, ("set the starting position for reading, project", mProjectName)("logstore", mCategory)( "config", mConfigName)("log reader queue name", mHostLogPath)("file device", ToString(mDevInode.dev))( - "file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)("start position", - mLastFilePos)); + "file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)( + "file signature size", mLastFileSignatureSize)("start position", mLastFilePos)); return true; } @@ -827,9 +829,9 @@ void LogFileReader::FixLastFilePos(LogFileOperator& op, int64_t endOffset) { LOG_WARNING(sLogger, ("no begin line found", "most likely to have parse error when reading begins")("project", mProjectName)( - "logstore", mCategory)("config", mConfigName)("log reader queue name", - mHostLogPath)("file device", ToString(mDevInode.dev))( - "file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)( + "logstore", mCategory)("config", mConfigName)("log reader queue name", mHostLogPath)( + "file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( + "file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)( "search start position", mLastFilePos)("search end position", mLastFilePos + readSizeReal)); free(readBuf); @@ -1047,17 +1049,17 @@ void LogFileReader::OnOpenFileError() { LOG_INFO(sLogger, ("open file failed", " log file not exist, probably caused by rollback")("project", mProjectName)( "logstore", mCategory)("config", mConfigName)("log reader queue name", mHostLogPath)( - "log path", mRealLogPath)("file device", ToString(mDevInode.dev))("file inode", - ToString(mDevInode.inode))( - "file signature", mLastFileSignatureHash)("last file position", mLastFilePos)); + "log path", mRealLogPath)("file device", ToString(mDevInode.dev))( + "file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)( + "file signature size", mLastFileSignatureSize)("last file position", mLastFilePos)); break; case EACCES: LOG_ERROR(sLogger, ("open file failed", "open log file fail because of permission")("project", mProjectName)( "logstore", mCategory)("config", mConfigName)("log reader queue name", mHostLogPath)( - "log path", mRealLogPath)("file device", ToString(mDevInode.dev))("file inode", - ToString(mDevInode.inode))( - "file signature", mLastFileSignatureHash)("last file position", mLastFilePos)); + "log path", mRealLogPath)("file device", ToString(mDevInode.dev))( + "file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)( + "file signature size", mLastFileSignatureSize)("last file position", mLastFilePos)); LogtailAlarm::GetInstance()->SendAlarm(LOGFILE_PERMINSSION_ALARM, string("Failed to open log file because of permission: ") + mHostLogPath, @@ -1070,7 +1072,8 @@ void LogFileReader::OnOpenFileError() { ("open file failed", "too many open file")("project", mProjectName)("logstore", mCategory)( "config", mConfigName)("log reader queue name", mHostLogPath)("log path", mRealLogPath)( "file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( - "file signature", mLastFileSignatureHash)("last file position", mLastFilePos)); + "file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)( + "last file position", mLastFilePos)); LogtailAlarm::GetInstance()->SendAlarm(OPEN_LOGFILE_FAIL_ALARM, string("Failed to open log file because of : Too many open files") + mHostLogPath, @@ -1083,7 +1086,8 @@ void LogFileReader::OnOpenFileError() { ("open file failed, errno", ErrnoToString(GetErrno()))("logstore", mCategory)( "config", mConfigName)("log reader queue name", mHostLogPath)("log path", mRealLogPath)( "file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( - "file signature", mLastFileSignatureHash)("last file position", mLastFilePos)); + "file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)( + "last file position", mLastFilePos)); LogtailAlarm::GetInstance()->SendAlarm(OPEN_LOGFILE_FAIL_ALARM, string("Failed to open log file: ") + mHostLogPath + "; errono:" + ErrnoToString(GetErrno()), @@ -1112,9 +1116,10 @@ bool LogFileReader::UpdateFilePtr() { ("open file failed, opened fd exceed limit, too many open files", GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize())( "limit", INT32_FLAG(max_reader_open_files))("project", mProjectName)("logstore", mCategory)( - "config", mConfigName)("log reader queue name", mHostLogPath)( - "file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( - "file signature", mLastFileSignatureHash)("last file position", mLastFilePos)); + "config", mConfigName)("log reader queue name", mHostLogPath)("file device", + ToString(mDevInode.dev))( + "file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)( + "file signature size", mLastFileSignatureSize)("last file position", mLastFilePos)); LogtailAlarm::GetInstance()->SendAlarm(OPEN_FILE_LIMIT_ALARM, string("Failed to open log file: ") + mHostLogPath + " limit:" + ToString(INT32_FLAG(max_reader_open_files)), @@ -1144,8 +1149,8 @@ bool LogFileReader::UpdateFilePtr() { ("open file succeeded, project", mProjectName)("logstore", mCategory)("config", mConfigName)( "log reader queue name", mHostLogPath)("real file path", mRealLogPath)( "file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( - "file signature", mLastFileSignatureHash)("last file position", mLastFilePos)("reader id", - long(this))); + "file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)( + "last file position", mLastFilePos)("reader id", long(this))); return true; } else { mLogFileOp.Close(); @@ -1156,9 +1161,9 @@ bool LogFileReader::UpdateFilePtr() { ("open file failed, log file dev inode changed or file deleted ", "prepare to delete reader or put reader into rotated map")("project", mProjectName)( "logstore", mCategory)("config", mConfigName)("log reader queue name", mHostLogPath)( - "log path", mRealLogPath)("file device", ToString(mDevInode.dev))("file inode", - ToString(mDevInode.inode))( - "file signature", mLastFileSignatureHash)("last file position", mLastFilePos)); + "log path", mRealLogPath)("file device", ToString(mDevInode.dev))( + "file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)( + "file signature size", mLastFileSignatureSize)("last file position", mLastFilePos)); return false; } tryTime = 0; @@ -1181,8 +1186,8 @@ bool LogFileReader::UpdateFilePtr() { ("open file succeeded, project", mProjectName)("logstore", mCategory)("config", mConfigName)( "log reader queue name", mHostLogPath)("real file path", mRealLogPath)( "file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( - "file signature", mLastFileSignatureHash)("last file position", mLastFilePos)("reader id", - long(this))); + "file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)( + "last file position", mLastFilePos)("reader id", long(this))); return true; } else { mLogFileOp.Close(); @@ -1190,9 +1195,10 @@ bool LogFileReader::UpdateFilePtr() { LOG_INFO(sLogger, ("open file failed, log file dev inode changed or file deleted ", "prepare to delete reader")("project", mProjectName)("logstore", mCategory)("config", mConfigName)( - "log reader queue name", mHostLogPath)("log path", mRealLogPath)( - "file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( - "file signature", mLastFileSignatureHash)("last file position", mLastFilePos)); + "log reader queue name", mHostLogPath)("log path", mRealLogPath)("file device", + ToString(mDevInode.dev))( + "file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)( + "file signature size", mLastFileSignatureSize)("last file position", mLastFilePos)); return false; } return true; @@ -1208,9 +1214,10 @@ bool LogFileReader::CloseTimeoutFilePtr(int32_t curTime) { if ((int64_t)buf.GetFileSize() == mLastFilePos) { LOG_INFO(sLogger, ("close the file", "current log file has not been updated for some time and has been read")( - "project", mProjectName)("logstore", mCategory)("config", mConfigName)( - "log reader queue name", mHostLogPath)("file device", ToString(mDevInode.dev))( - "file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)( + "project", mProjectName)("logstore", mCategory)("config", mConfigName)("log reader queue name", + mHostLogPath)( + "file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( + "file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)( "file size", mLastFileSize)("last file position", mLastFilePos)); CloseFilePtr(); // delete item in LogFileCollectOffsetIndicator map @@ -1236,8 +1243,8 @@ void LogFileReader::CloseFilePtr() { ("update the real file path of the log reader during closing, project", mProjectName)( "logstore", mCategory)("config", mConfigName)("log reader queue name", mHostLogPath)( "file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( - "file signature", mLastFileSignatureHash)("original file path", - mRealLogPath)("new file path", curRealLogPath)); + "file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)( + "original file path", mRealLogPath)("new file path", curRealLogPath)); mRealLogPath = curRealLogPath; if (mEOOption && mRealLogPath != mEOOption->primaryCheckpoint.real_path()) { updatePrimaryCheckpointRealPath(); @@ -1249,12 +1256,13 @@ void LogFileReader::CloseFilePtr() { if (mLogFileOp.Close() != 0) { int fd = mLogFileOp.GetFd(); - LOG_WARNING(sLogger, - ("close file error", strerror(errno))("fd", fd)("project", mProjectName)("logstore", mCategory)( - "config", mConfigName)("log reader queue name", mHostLogPath)( - "real file path", mRealLogPath)("file device", ToString(mDevInode.dev))( - "file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)( - "file size", mLastFileSize)("last file position", mLastFilePos)("reader id", long(this))); + LOG_WARNING( + sLogger, + ("close file error", strerror(errno))("fd", fd)("project", mProjectName)("logstore", mCategory)( + "config", mConfigName)("log reader queue name", mHostLogPath)("real file path", mRealLogPath)( + "file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( + "file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)( + "file size", mLastFileSize)("last file position", mLastFilePos)("reader id", long(this))); LogtailAlarm::GetInstance()->SendAlarm(OPEN_LOGFILE_FAIL_ALARM, string("close file error because of ") + strerror(errno) + ", file path: " + mHostLogPath + ", inode: " @@ -1265,9 +1273,9 @@ void LogFileReader::CloseFilePtr() { } else { LOG_INFO(sLogger, ("close file succeeded, project", mProjectName)("logstore", mCategory)("config", mConfigName)( - "log reader queue name", mHostLogPath)("real file path", - mRealLogPath)("file device", ToString(mDevInode.dev))( - "file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)( + "log reader queue name", mHostLogPath)("real file path", mRealLogPath)( + "file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( + "file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)( "file size", mLastFileSize)("last file position", mLastFilePos)("reader id", long(this))); } // always call OnFileClose @@ -1303,8 +1311,9 @@ bool LogFileReader::CheckFileSignatureAndOffset(bool isOpenOnUpdate) { if (mLogFileOp.Close() == 0) { LOG_INFO(sLogger, ("close file succeeded, project", mProjectName)("logstore", mCategory)("config", mConfigName)( - "log reader queue name", mHostLogPath)("file device", ToString(mDevInode.dev))( - "file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)( + "log reader queue name", + mHostLogPath)("file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))( + "file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)( "file size", mLastFileSize)("last file position", mLastFilePos)); } GloablFileDescriptorManager::GetInstance()->OnFileClose(this); @@ -1334,7 +1343,7 @@ bool LogFileReader::CheckFileSignatureAndOffset(bool isOpenOnUpdate) { mLogFileOp.Stat(ps); time_t lastMTime = mLastMTime; mLastMTime = ps.GetMtime(); - if (!isOpenOnUpdate || endSize < mLastFilePos ||(endSize == mLastFilePos && lastMTime != mLastMTime)) { + if (!isOpenOnUpdate || mLastFileSignatureSize == 0 || endSize < mLastFilePos || (endSize == mLastFilePos && lastMTime != mLastMTime)) { char firstLine[1025]; int nbytes = mLogFileOp.Pread(firstLine, 1, 1024, 0); if (nbytes < 0) { @@ -1872,7 +1881,7 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, : 0UL; char* stringBuffer = stringMemory.data; if (nbytes == 0 && (!lastCacheSize || allowRollback)) { // read nothing, if no cached data or allow rollback the - // reader's state cannot be changed + // reader's state cannot be changed return; } if (lastCacheSize) { @@ -2380,7 +2389,8 @@ LogFileReader::~LogFileReader() { ("destruct the corresponding log reader, project", mProjectName)("logstore", mCategory)( "config", mConfigName)("log reader queue name", mHostLogPath)("file device", ToString(mDevInode.dev))( "file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)( - "file size", mLastFileSize)("last file position", mLastFilePos)); + "file signature size", mLastFileSignatureSize)("file size", mLastFileSize)("last file position", + mLastFilePos)); CloseFilePtr(); // Mark GC so that corresponding resources can be released.