From e0f98a86de728a5214ec794073fcce6f79d688aa Mon Sep 17 00:00:00 2001 From: toni-moreno Date: Fri, 9 Mar 2018 06:36:04 +0100 Subject: [PATCH] added nmon remote file position control and improved measurement building process --- pkg/agent/devices/nmon/import.go | 38 +++- pkg/agent/devices/nmon/nmonfile.go | 328 +++++++++++++++------------ pkg/agent/devices/nmon/nmonserver.go | 2 +- pkg/agent/devices/nmon/nmonstats.go | 206 +++++++++++------ pkg/config/database.go | 12 +- pkg/config/dbconfig.go | 16 +- pkg/config/devicecfg.go | 2 +- pkg/config/nmonfileinfo.go | 199 ++++++++++++++++ pkg/data/rfile/rfile.go | 49 +++- 9 files changed, 615 insertions(+), 237 deletions(-) create mode 100644 pkg/config/nmonfileinfo.go diff --git a/pkg/agent/devices/nmon/import.go b/pkg/agent/devices/nmon/import.go index 444322a..0af60b5 100644 --- a/pkg/agent/devices/nmon/import.go +++ b/pkg/agent/devices/nmon/import.go @@ -1,6 +1,7 @@ package nmon import ( + "github.com/adejoux/pSeriesCollector/pkg/config" "github.com/adejoux/pSeriesCollector/pkg/data/pointarray" ) @@ -11,18 +12,45 @@ func (d *Server) ImportData(points *pointarray.PointArray) error { if d.nmonFile == nil { d.Infof("Initializing Nmon Remote File") d.nmonFile = NewNmonFile(d.client, d.GetLogger(), d.cfg.NmonFilePath, d.cfg.Name) - d.nmonFile.Init() + filepos, err := d.nmonFile.Init() + if err != nil { + d.Errorf("Something happen on Initialize Nmon file: %s", err) + return err + } + // Got last known position + info, err := db.GetNmonFileInfoByIDFile(d.cfg.ID, d.nmonFile.CurFile) + if err != nil { + d.Debugf("Warning on get file info for ID [%s] and file [%s] ", d.cfg.ID, d.nmonFile.CurFile) + d.Infof("Current File Position %s is: %d", d.nmonFile.CurFile, filepos) + } else { + d.nmonFile.SetPosition(info.LastPosition) + d.Infof("Updated File Position %s now to: %d", d.nmonFile.CurFile, info.LastPosition) + } d.Debugf("Found Dataseries: %#+v", d.nmonFile.DataSeries) d.Debugf("Found Content %s", d.nmonFile.TextContent) + } + + if d.nmonFile.ReopenIfChanged() { + //if file has been rotated with format like /var/log/nmon/%{hostname}_%Y%m%d_%H%M.nmon + //old file has been closed and a new one opened + // we should now rescan definitions + d.Infof("File %s should be rescanned for new sections/columns ", d.nmonFile.CurFile) + pos, err := d.nmonFile.InitSectionDefs() + if err != nil { + return err + } + + // now last file has been closed and a new one created + //PENDING delete from FileInfo last file + db.AddOrUpdateNmonFileInfo(&config.NmonFileInfo{ID: d.cfg.ID, DeviceName: d.cfg.Name, FileName: d.nmonFile.CurFile, LastPosition: pos}) } - d.nmonFile.UpdateContent() + filepos := d.nmonFile.UpdateContent() // Add last processed lines - d.nmonFile.ProcessPending(points, d.TagMap) - - d.Debugf("SFTP status %#+v", d.client) + d.Infof("Current File Position is [%d] last processed Chunk %s ", filepos, d.nmonFile.LastTime.String()) + db.AddOrUpdateNmonFileInfo(&config.NmonFileInfo{ID: d.cfg.ID, DeviceName: d.cfg.Name, FileName: d.nmonFile.CurFile, LastPosition: filepos}) return nil } diff --git a/pkg/agent/devices/nmon/nmonfile.go b/pkg/agent/devices/nmon/nmonfile.go index 63d431c..b52633b 100644 --- a/pkg/agent/devices/nmon/nmonfile.go +++ b/pkg/agent/devices/nmon/nmonfile.go @@ -1,17 +1,18 @@ package nmon import ( - "github.com/Sirupsen/logrus" - - "github.com/adejoux/pSeriesCollector/pkg/data/rfile" - - "github.com/pkg/sftp" + "fmt" "regexp" "strconv" "strings" "time" + "github.com/Sirupsen/logrus" + "github.com/pkg/sftp" + "github.com/adejoux/pSeriesCollector/pkg/data/pointarray" + "github.com/adejoux/pSeriesCollector/pkg/data/rfile" + "github.com/adejoux/pSeriesCollector/pkg/data/utils" ) var hostRegexp = regexp.MustCompile(`^AAA.host.(\S+)`) @@ -24,9 +25,6 @@ var infoRegexp = regexp.MustCompile(`^AAA.(.*)`) var skipRegexp = regexp.MustCompile(`T0+\W|^Z|^TOP.%CPU`) -var nfsRegexp = regexp.MustCompile(`^NFS`) -var nameRegexp = regexp.MustCompile(`(\d+)$`) - var delimiterRegexp = regexp.MustCompile(`^\w+(.)`) // DataSerie data @@ -50,6 +48,7 @@ type NmonFile struct { HostName string PendingLines []string tzLocation *time.Location + LastTime time.Time } // NewNmonFile create a NmonFile @@ -72,29 +71,40 @@ func (nf *NmonFile) filePathCheck() bool { pattern = strings.Replace(pattern, "%{hostname}", strings.ToLower(nf.HostName), -1) pattern = strings.Replace(pattern, "%{HOSTNAME}", strings.ToUpper(nf.HostName), -1) pattern = strings.Replace(pattern, "%Y", strconv.Itoa(year), -1) - pattern = strings.Replace(pattern, "%m", strconv.Itoa(int(month)), -1) - pattern = strings.Replace(pattern, "%d", strconv.Itoa(day), -1) - pattern = strings.Replace(pattern, "%H", strconv.Itoa(hour), 1) - pattern = strings.Replace(pattern, "%M", strconv.Itoa(min), -1) - pattern = strings.Replace(pattern, "%S", strconv.Itoa(sec), -1) + pattern = strings.Replace(pattern, "%m", fmt.Sprintf("%02d", int(month)), -1) + pattern = strings.Replace(pattern, "%d", fmt.Sprintf("%02d", day), -1) + pattern = strings.Replace(pattern, "%H", fmt.Sprintf("%02d", hour), 1) + pattern = strings.Replace(pattern, "%M", fmt.Sprintf("%02d", min), -1) + pattern = strings.Replace(pattern, "%S", fmt.Sprintf("%02d", sec), -1) if nf.CurFile != pattern { - nf.log.Debugf("Detected Nmon File change OLD [%s] NEW [%s]", nf.CurFile, pattern) + nf.log.Infof("Detected Nmon File change OLD [%s] NEW [%s]", nf.CurFile, pattern) nf.CurFile = pattern return true } return false } -// CheckFile check if file has changed and reopen again if needed -func (nf *NmonFile) CheckFile() { +// SetPosition set remote file at newPos Posistion +func (nf *NmonFile) SetPosition(newpos int64) error { + realpos, err := nf.File.SetPosition(newpos) + if err != nil { + nf.log.Debug("Error on set File %s on expected %d / real %d: error :%s", nf.CurFile, newpos, realpos, err) + return err + } + return nil +} + +// ReopenIfChanged check if file has changed and reopen again if needed +func (nf *NmonFile) ReopenIfChanged() bool { if nf.filePathCheck() { //file should be changed (maybe a rotation? or recreation?) //close remote connection nf.File.End() //recreate a new connection nf.File = rfile.New(nf.sftpConn, nf.log, nf.CurFile) - //initializing file + return true } + return false } // AddNmonSection add new Section @@ -103,7 +113,7 @@ func (nf *NmonFile) AddNmonSection(line string) { return } if headerRegexp.MatchString(line) { - nf.log.Debug("This is not a valid Header Line [%d]") + nf.log.Debugf("This is line has not a valid Section : Line [%s]", line) return } @@ -116,7 +126,7 @@ func (nf *NmonFile) AddNmonSection(line string) { elems := strings.Split(line, nf.Delimiter) if len(elems) < 3 { - nf.log.Errorf("ERROR: parsing the following line : %s\n", line) + nf.log.Errorf("ERROR: parsing the following line , not enougth columns (min 3) : %s\n", line) return } name := elems[0] @@ -130,15 +140,25 @@ func (nf *NmonFile) AddNmonSection(line string) { nf.DataSeries[name] = dataserie } -// Init Initialize NmonFile struct -func (nf *NmonFile) Init() { - nf.SetLocation("") //pending set location from system - nf.filePathCheck() - nf.File = rfile.New(nf.sftpConn, nf.log, nf.CurFile) +//PENDING : should we do a more acurated sanitize for field names ??? +// "Wait% " => "Wait_percent" ?? +// "free(MB)" => "free_mb" ?? +// "eth0-read-KB/s" => eth0_read_kb_s ?? +// "read/s" => "read_s" ?? + +func sanitize(in string) string { + // "User " => "User" ?? + return strings.TrimSpace(in) +} + +func (nf *NmonFile) InitSectionDefs() (int64, error) { //Map init nf.DataSeries = make(map[string]DataSerie) // Get Content - data := nf.File.Content() + data, pos, err := nf.File.ContentUntilMatch(timeRegexp) + if err != nil { + return 0, err + } nf.log.Infof("Initialice NMONFILE: %s", nf.FilePattern) first := true @@ -208,12 +228,22 @@ func (nf *NmonFile) Init() { nf.PendingLines = append(nf.PendingLines, data[last:]...) - nf.log.Debugf("End of NMONFile %s Initialization, pending lines %d", nf.FilePattern, len(nf.PendingLines)) + return pos, nil +} + +// Init Initialize NmonFile struct return current position after initialized +func (nf *NmonFile) Init() (int64, error) { + nf.SetTimeZoneLocation("") //pending set location from system + nf.filePathCheck() + nf.File = rfile.New(nf.sftpConn, nf.log, nf.CurFile) + pos, err := nf.InitSectionDefs() + nf.log.Debugf("End of NMONFile %s Initialization, pending lines on buffer: [%d] Current file position: [%d]", nf.FilePattern, len(nf.PendingLines), pos) + return pos, err } // UpdateContent from remoteFile -func (nf *NmonFile) UpdateContent() { - morelines := nf.File.Content() +func (nf *NmonFile) UpdateContent() int64 { + morelines, pos := nf.File.Content() nf.log.Infof("Got new %d lines from NmonFile ", len(morelines)) // replace data if needed depending on the delimiter if nf.Delimiter == ";" { @@ -223,12 +253,13 @@ func (nf *NmonFile) UpdateContent() { } } nf.PendingLines = append(nf.PendingLines, morelines...) + return pos } const timeformat = "15:04:05 02-Jan-2006" -//SetLocation set the timezone used to input metrics in InfluxDB -func (nf *NmonFile) SetLocation(tz string) (err error) { +//SetTimeZoneLocation set the timezone used to input metrics in InfluxDB +func (nf *NmonFile) SetTimeZoneLocation(tz string) (err error) { var loc *time.Location if len(tz) > 0 { loc, err = time.LoadLocation(tz) @@ -304,6 +335,7 @@ func (nf *NmonFile) ProcessPending(points *pointarray.PointArray, tags map[strin continue } nf.ProcessChunk(points, tags, t, tsID, nmonChunk) + nf.LastTime = t } } @@ -312,156 +344,156 @@ func (nf *NmonFile) ProcessPending(points *pointarray.PointArray, tags map[strin func (nf *NmonFile) ProcessChunk(pa *pointarray.PointArray, Tags map[string]string, t time.Time, timeID string, lines []string) { nf.log.Infof("Decoding Chunk for Timestamp %s with %d Elements ", t.String(), len(lines)) + regstr := "" for _, line := range lines { //check if exit header to process data header := strings.Split(line, nf.Delimiter)[0] if _, ok := nf.DataSeries[header]; !ok { - nf.log.Infof("Line not in Header [%s] tring to add...", line) + nf.log.Infof("Line not in Header [%s] trying to add...", line) // if not perhaps is a new header nf.AddNmonSection(line) + if len(regstr) > 0 { + regstr = regstr + "|^" + header + } else { + regstr = "^" + header + } + continue } - //PENDING a way to skip metrics - if skipRegexp.MatchString(line) { - continue + } + if len(regstr) > 0 { + //there is a new + nf.log.Infof("Found not allowed sections REGEX = [%s]", regstr) + contains, notcontains := utils.Grep(lines, regexp.MustCompile(regstr)) + lines = notcontains + nf.log.Debugf("CONTAINS:%+v", contains) + nf.log.Debugf("NOTCONTAINS: %+v", notcontains) + } + + remain := lines + var linesok []string + var linesnotok []string + + for { + if len(remain) == 0 { + //exit from the loop if any other line pending to process. + break } - // CPU Stats - if cpuallRegexp.MatchString(line) { - matched := genStatsRegexp.FindStringSubmatch(line) - if matched[1] != timeID { - nf.log.Warning("Line not in time TIMEID [%s] : Line :[%s]", timeID, line) - continue - } - nf.processCPUStats(pa, Tags, t, []string{line}) - continue + //Filter All HardCoded + _, remain = utils.Grep(remain, skipRegexp) + //Filter Not In Time data + remain, linesnotok = utils.Grep(remain, regexp.MustCompile(`\W`+timeID)) + if len(linesnotok) > 0 { + nf.log.Warning("Lines not in time TIMEID [%s] : Lines :[%+v]", timeID, linesnotok) + } + //----------------------------------------------------------------------------- + // We will only process , format and send measurements from known Nmon Seccions + //----------------------------------------------------------------------------- + + //CPU + linesok, remain = utils.Grep(remain, cpuRegexp) + if len(linesok) > 0 { + nf.processCPUStats(pa, Tags, t, linesok) + } + if len(remain) == 0 { + break } // MEM Stats - if memRegexp.MatchString(line) { - matched := genStatsRegexp.FindStringSubmatch(line) - if matched[1] != timeID { - nf.log.Warning("Line not in time TIMEID [%s] : Line :[%s]", timeID, line) - continue - } - nf.processMEMStats(pa, Tags, t, []string{line}) - continue + linesok, remain = utils.Grep(remain, memRegexp) + if len(linesok) > 0 { + nf.processMEMStats(pa, Tags, t, linesok) + } + if len(remain) == 0 { + break } // PAGING Stats - if pagingRegexp.MatchString(line) { - matched := genStatsRegexp.FindStringSubmatch(line) - if matched[1] != timeID { - nf.log.Warning("Line not in time TIMEID [%s] : Line :[%s]", timeID, line) - continue - } - nf.processColumnAsTags(pa, Tags, t, []string{line}, "paging", "psname", pagingRegexp) - continue + linesok, remain = utils.Grep(remain, pagingRegexp) + if len(linesok) > 0 { + nf.processColumnAsTags(pa, Tags, t, linesok, "paging", "psname", pagingRegexp) + } + if len(remain) == 0 { + break } // DISK Stats - if diskRegexp.MatchString(line) { - matched := genStatsRegexp.FindStringSubmatch(line) - if matched[1] != timeID { - nf.log.Warning("Line not in time TIMEID [%s] : Line :[%s]", timeID, line) - continue - } - nf.processColumnAsTags(pa, Tags, t, []string{line}, "disks", "diskname", diskRegexp) - continue + linesok, remain = utils.Grep(remain, diskRegexp) + if len(linesok) > 0 { + nf.processColumnAsTags(pa, Tags, t, linesok, "disks", "diskname", diskRegexp) + } + if len(remain) == 0 { + break } // VG Stats - if vgRegexp.MatchString(line) { - matched := genStatsRegexp.FindStringSubmatch(line) - if matched[1] != timeID { - nf.log.Warning("Line not in time TIMEID [%s] : Line :[%s]", timeID, line) - continue - } - nf.processColumnAsTags(pa, Tags, t, []string{line}, "volumegroup", "vgname", vgRegexp) - continue + linesok, remain = utils.Grep(remain, vgRegexp) + if len(linesok) > 0 { + nf.processColumnAsTags(pa, Tags, t, linesok, "volumegroup", "vgname", vgRegexp) + } + if len(remain) == 0 { + break } // JFS Stats - if jfsRegexp.MatchString(line) { - matched := genStatsRegexp.FindStringSubmatch(line) - if matched[1] != timeID { - nf.log.Warning("Line not in time TIMEID [%s] : Line :[%s]", timeID, line) - continue - } - nf.processColumnAsTags(pa, Tags, t, []string{line}, "jfs", "fsname", jfsRegexp) - continue + linesok, remain = utils.Grep(remain, jfsRegexp) + if len(linesok) > 0 { + nf.processColumnAsTags(pa, Tags, t, linesok, "jfs", "fsname", jfsRegexp) + } + if len(remain) == 0 { + break } // FC Stats - if fcRegexp.MatchString(line) { - matched := genStatsRegexp.FindStringSubmatch(line) - if matched[1] != timeID { - nf.log.Warning("Line not in time TIMEID [%s] : Line :[%s]", timeID, line) - continue - } - nf.processColumnAsTags(pa, Tags, t, []string{line}, "fiberchannel", "fcname", fcRegexp) - continue + linesok, remain = utils.Grep(remain, fcRegexp) + if len(linesok) > 0 { + nf.processColumnAsTags(pa, Tags, t, linesok, "fiberchannel", "fcname", fcRegexp) + } + if len(remain) == 0 { + break } // DG Stats - if dgRegexp.MatchString(line) { - matched := genStatsRegexp.FindStringSubmatch(line) - if matched[1] != timeID { - nf.log.Warning("Line not in time TIMEID [%s] : Line :[%s]", timeID, line) - continue - } - nf.processColumnAsTags(pa, Tags, t, []string{line}, "diskgroup", "dgname", dgRegexp) - continue + linesok, remain = utils.Grep(remain, dgRegexp) + if len(linesok) > 0 { + nf.processColumnAsTags(pa, Tags, t, linesok, "diskgroup", "dgname", dgRegexp) + } + if len(remain) == 0 { + break } - //NET stats - if netRegexp.MatchString(line) { - matched := genStatsRegexp.FindStringSubmatch(line) - if matched[1] != timeID { - nf.log.Warning("Line not in time TIMEID [%s] : Line :[%s]", timeID, line) - continue - } - nf.processMixedColumnAsFieldAndTags(pa, Tags, t, []string{line}, "network", "ifname") - continue + linesok, remain = utils.Grep(remain, netRegexp) + if len(linesok) > 0 { + nf.processMixedColumnAsFieldAndTags(pa, Tags, t, linesok, "network", "ifname") + } + if len(remain) == 0 { + break } - //SEA stats - if seaRegexp.MatchString(line) { - matched := genStatsRegexp.FindStringSubmatch(line) - if matched[1] != timeID { - nf.log.Warning("Line not in time TIMEID [%s] : Line :[%s]", timeID, line) - continue - } - nf.processMixedColumnAsFieldAndTags(pa, Tags, t, []string{line}, "sea", "seaname") - continue + linesok, remain = utils.Grep(remain, seaRegexp) + if len(linesok) > 0 { + nf.processMixedColumnAsFieldAndTags(pa, Tags, t, linesok, "sea", "seaname") + } + if len(remain) == 0 { + break } - //IOADAPT stats - if ioadaptRegexp.MatchString(line) { - matched := genStatsRegexp.FindStringSubmatch(line) - if matched[1] != timeID { - nf.log.Warning("Line not in time TIMEID [%s] : Line :[%s]", timeID, line) - continue - } - nf.processMixedColumnAsFieldAndTags(pa, Tags, t, []string{line}, "ioadapt", "adaptname") - continue + linesok, remain = utils.Grep(remain, ioadaptRegexp) + if len(linesok) > 0 { + nf.processMixedColumnAsFieldAndTags(pa, Tags, t, linesok, "ioadapt", "adaptname") + } + if len(remain) == 0 { + break } - //TOP stats - if topRegexp.MatchString(line) { - matched := topRegexp.FindStringSubmatch(line) - if matched[1] != timeID { - nf.log.Warning("Line not in time TIMEID [%s] : Line :[%s]", timeID, line) - continue - } - nf.processTopStats(pa, Tags, t, []string{line}) - continue + linesok, remain = utils.Grep(remain, topRegexp) + if len(linesok) > 0 { + nf.processTopStats(pa, Tags, t, linesok) } - - //Other Generic Stats - if genStatsRegexp.MatchString(line) { - matched := genStatsRegexp.FindStringSubmatch(line) - if matched[1] != timeID { - nf.log.Warning("Line not in time TIMEID [%s] : Line :[%s]", timeID, line) - continue - } - nf.processGenericStats(pa, Tags, t, line) - continue + if len(remain) == 0 { + break + } + //POOLS,LPAR,PAGE,PROC,PROCAIO,FILE,VM + linesok, remain = utils.Grep(remain, columAsFieldRegexp) + if len(linesok) > 0 { + nf.processColumnAsField(pa, Tags, t, linesok) + } + if len(remain) != 0 { + nf.log.Warnf("Lines not processed [%+v] Perhaps is not in Catalog????...", remain) } - - nf.log.Warnf("Line not processed [%s] adding to the header definitions...", line) - } + } diff --git a/pkg/agent/devices/nmon/nmonserver.go b/pkg/agent/devices/nmon/nmonserver.go index ed8d70e..10e62e8 100644 --- a/pkg/agent/devices/nmon/nmonserver.go +++ b/pkg/agent/devices/nmon/nmonserver.go @@ -38,7 +38,7 @@ func Ping(c *config.DeviceCfg, log *logrus.Logger, apidbg bool, filename string) return nil, elapsed, "test", err } - return sftp, elapsed, "test", nil + return sftp, elapsed, "SFTP test", nil } //ScanNmonDevice scan Device diff --git a/pkg/agent/devices/nmon/nmonstats.go b/pkg/agent/devices/nmon/nmonstats.go index fca6281..c59c0a8 100644 --- a/pkg/agent/devices/nmon/nmonstats.go +++ b/pkg/agent/devices/nmon/nmonstats.go @@ -1,26 +1,15 @@ package nmon import ( - "github.com/adejoux/pSeriesCollector/pkg/data/pointarray" - - "github.com/adejoux/pSeriesCollector/pkg/data/utils" "math" "regexp" "strconv" "strings" "time" -) - -//PENDING : should we do a more acurated sanitize for field names ??? -// "Wait% " => "Wait_percent" ?? -// "free(MB)" => "free_mb" ?? -// "eth0-read-KB/s" => eth0_read_kb_s ?? -// "read/s" => "read_s" ?? -func sanitize(in string) string { - // "User " => "User" ?? - return strings.TrimSpace(in) -} + "github.com/adejoux/pSeriesCollector/pkg/data/pointarray" + "github.com/adejoux/pSeriesCollector/pkg/data/utils" +) //----------------------------------------------------------- // Handle CPU Measurements @@ -33,7 +22,7 @@ func sanitize(in string) string { //SCPU01,SCPU N vio4839n2,User ,Sys ,Wait ,Idle // OLD var cpuallRegexp = regexp.MustCompile(`^CPU\d+|^SCPU\d+|^PCPU\d+`) -var cpuallRegexp = regexp.MustCompile(`^CPU\d+|^SCPU\d+|^PCPU\d+|^+CPU_ALL`) +var cpuRegexp = regexp.MustCompile(`^CPU\d+|^SCPU\d+|^PCPU\d+|^+CPU_ALL`) func (nf *NmonFile) processCPUStats(pa *pointarray.PointArray, Tags map[string]string, t time.Time, lines []string) { @@ -56,7 +45,7 @@ func (nf *NmonFile) processCPUStats(pa *pointarray.PointArray, Tags map[string]s // try to convert string to integer converted, parseErr := strconv.ParseFloat(value, 64) if parseErr != nil || math.IsNaN(converted) { - nf.log.Warnf("There is some trouble to convert data to float in value [%s] :%s ", value, parseErr) + nf.log.Warnf("There is some trouble to convert data column (%d) in Line[%s] to float in value [%s] :%s ", i+2, line, value, parseErr) //if not working, skip to next value. We don't want text values in InfluxDB. continue } @@ -102,7 +91,8 @@ func (nf *NmonFile) processMEMStats(pa *pointarray.PointArray, Tags map[string]s // try to convert string to integer converted, parseErr := strconv.ParseFloat(value, 64) if parseErr != nil || math.IsNaN(converted) { - nf.log.Warnf("There is some trouble to convert data to float in value [%s] :%s ", value, parseErr) + nf.log.Warnf("There is some trouble to convert data column (%d) in Line[%s] to float in value [%s] :%s ", i+2, line, value, parseErr) + //if not working, skip to next value. We don't want text values in InfluxDB. continue } @@ -142,7 +132,6 @@ func (nf *NmonFile) processTopStats(pa *pointarray.PointArray, Tags map[string]s nf.log.Debugf("Processing TOP stats: %+v", lines) for _, line := range lines { elems := strings.Split(line, nf.Delimiter) - name := elems[0] if len(elems) < 14 { nf.log.Errorf("error TOP import: Elements [%s]", elems) @@ -159,7 +148,6 @@ func (nf *NmonFile) processTopStats(pa *pointarray.PointArray, Tags map[string]s } tags := utils.MapDupAndAdd(Tags, map[string]string{ - "host": nf.HostName, "pid": elems[1], "command": elems[13], "wlm": wlmclass, @@ -175,7 +163,8 @@ func (nf *NmonFile) processTopStats(pa *pointarray.PointArray, Tags map[string]s // try to convert string to integer converted, parseErr := strconv.ParseFloat(value, 64) if parseErr != nil { - nf.log.Warnf("There is some trouble to convert data to float in value [%s] :%s ", value, parseErr) + nf.log.Warnf("There is some trouble to convert data column (%d) in Line[%s] to float in value [%s] :%s ", i+2, line, value, parseErr) + //if not working, skip to next value. We don't want text values in InfluxDB. continue } @@ -183,7 +172,7 @@ func (nf *NmonFile) processTopStats(pa *pointarray.PointArray, Tags map[string]s fields[column] = converted } - pa.Append(name, tags, fields, t) + pa.Append("top", tags, fields, t) } } @@ -232,15 +221,21 @@ func (nf *NmonFile) processMixedColumnAsFieldAndTags(pa *pointarray.PointArray, measurements := make(map[string]map[string]interface{}) //this regex could generate bugs on systems with docker autogenerated bridgets with name "br-[NETWORK_ID]" + // as a workarround could force net name + //https://stackoverflow.com/questions/43981224/docker-how-to-set-iface-name-when-creating-a-new-network + var tagfieldRegexp = regexp.MustCompile(`^([^_-]*)[_-]{1}(.*)`) for _, line := range lines { elems := strings.Split(line, nf.Delimiter) name := elems[0] - for _, col := range nf.DataSeries[name].Columns { + for k, col := range nf.DataSeries[name].Columns { + if len(col) == 0 { + continue + } matched := tagfieldRegexp.FindStringSubmatch(col) if len(matched) < 3 { - nf.log.Warning("There is some trouble on getting tagname-fieldname from column: %+v, column: %s", matched, col) + nf.log.Warnf("There is some trouble on getting tagname-fieldname from column# [%d] value [%s] size [%d] AllColumns[%+v] Matched[%+v]", k, col, len(col), nf.DataSeries[name], matched) continue } tag := matched[1] @@ -248,7 +243,7 @@ func (nf *NmonFile) processMixedColumnAsFieldAndTags(pa *pointarray.PointArray, if _, ok := measurements[tag]; !ok { measurements[tag] = make(map[string]interface{}) } - measurements[tag][field] = false + measurements[tag][field] = nil } } nf.log.Debugf("Detected Struct %+v", measurements) @@ -264,9 +259,13 @@ func (nf *NmonFile) processMixedColumnAsFieldAndTags(pa *pointarray.PointArray, continue } column := nf.DataSeries[name].Columns[i] + //on NET devices data could finish with ","=> NET,a,b,c, + if len(column) == 0 { + continue + } matched := tagfieldRegexp.FindStringSubmatch(column) if len(matched) < 3 { - nf.log.Warning("There is some trouble on getting tagname-fieldname from column: %+v", matched) + nf.log.Warnf("There is some trouble on getting tagname-fieldname from column (%d) Columns[%+v] Matched[%+v]", i, nf.DataSeries[name], matched) continue } tag := matched[1] @@ -275,7 +274,8 @@ func (nf *NmonFile) processMixedColumnAsFieldAndTags(pa *pointarray.PointArray, // try to convert string to integer converted, parseErr := strconv.ParseFloat(value, 64) if parseErr != nil || math.IsNaN(converted) { - nf.log.Warnf("There is some trouble to convert data to float in value [%s] :%s ", value, parseErr) + nf.log.Warnf("There is some trouble to convert data column (%d) in Line[%s] to float in value [%s] :%s ", i+2, line, value, parseErr) + //if not working, skip to next value. We don't want text values in InfluxDB. continue } @@ -289,7 +289,12 @@ func (nf *NmonFile) processMixedColumnAsFieldAndTags(pa *pointarray.PointArray, for kmeas, meas := range measurements { tags := utils.MapDupAndAdd(Tags, map[string]string{tagname: kmeas}) - //TODO clean fields with not float number + //clean not provided fields + for k, v := range meas { + if v == nil { + delete(meas, k) + } + } pa.Append(measname, tags, meas, t) } } @@ -358,124 +363,179 @@ var dgRegexp = regexp.MustCompile(`^DG([a-zA-Z]*).*`) //DGSIZE,Disk Group Block Size KB XXXXXXX //DGXFER,Disk Group Transfers/s XXXXXXX +func fieldFromLine(line string, reg *regexp.Regexp) string { + //fieldname from Section name in lowercase + matched := reg.FindStringSubmatch(line) + var fieldname string + //if not data + if len(matched[1]) > 0 { + fieldname = strings.ToLower(matched[1]) + } else { + fieldname = strings.ToLower(matched[0]) + } + return fieldname +} + func (nf *NmonFile) processColumnAsTags(pa *pointarray.PointArray, Tags map[string]string, t time.Time, lines []string, measname string, tagname string, fieldregexp *regexp.Regexp) { nf.log.Debugf("Processing ColumnAsTags [%s][%s] stats: %+v", measname, tagname, lines) //these kind of lines has fields codified in the Line Header // example: DISKREAD,dddd,a,b,c,d => { 4 * { Measurement = Disk Field = "read" } and each one with one tag disk={a,b,c,d}} - //PENDING : to do a buffer to construct only len(nf.DataSeries[name].Columns) measurements + measurements := make(map[string]map[string]interface{}) + + //this regex could generate bugs on systems with docker autogenerated bridgets with name "br-[NETWORK_ID]" for _, line := range lines { + fieldname := fieldFromLine(line, fieldregexp) + //Tags from Column Names elems := strings.Split(line, nf.Delimiter) name := elems[0] - - matched := fieldregexp.FindStringSubmatch(line) - var fieldname string - //if not data - if len(matched[1]) > 0 { - fieldname = matched[1] - } else { - fieldname = matched[0] + for _, col := range nf.DataSeries[name].Columns { + tag := strings.ToLower(col) + if _, ok := measurements[tag]; !ok { + measurements[tag] = make(map[string]interface{}) + } + measurements[tag][fieldname] = nil } + } + nf.log.Debugf("Detected Struct %+v", measurements) + + for _, line := range lines { + + elems := strings.Split(line, nf.Delimiter) + name := elems[0] + + field := fieldFromLine(line, fieldregexp) for i, value := range elems[2:] { if len(nf.DataSeries[name].Columns) < i+1 { nf.log.Warnf("Entry added position %d in serie %s since nmon start: skipped COLUMNS [%#+v] Line [%s]", i+1, name, nf.DataSeries[name], line) continue } - column := nf.DataSeries[name].Columns[i] - columnTag := utils.MapDupAndAdd(Tags, map[string]string{tagname: strings.ToLower(column)}) - + tag := strings.ToLower(nf.DataSeries[name].Columns[i]) // try to convert string to integer converted, parseErr := strconv.ParseFloat(value, 64) if parseErr != nil || math.IsNaN(converted) { - nf.log.Warnf("There is some trouble to convert data to float in value [%s] :%s ", value, parseErr) + nf.log.Warnf("There is some trouble to convert data column (%d) in Line[%s] to float in value [%s] :%s ", i+2, line, value, parseErr) + //if not working, skip to next value. We don't want text values in InfluxDB. continue } - fields := make(map[string]interface{}) - fields[fieldname] = converted - pa.Append(measname, columnTag, fields, t) + measurements[tag][field] = converted } } //only one measurement ( one write) is needed becouse of memnew/memuse/mem has diferent fields and any tag + //now we can send all generated data + + for kmeas, meas := range measurements { + tags := utils.MapDupAndAdd(Tags, map[string]string{tagname: kmeas}) + //clean not provided fields + for k, v := range meas { + if v == nil { + delete(meas, k) + } + } + pa.Append(measname, tags, meas, t) + } } +//------------------------------------------------------------ +// Handle GenericColumnAsField Lines +// ---------------------------------------------------------- +// Expected Measurements //----------------------------------------------------------- -// Handle Generic Metrics ( Any other not previously handled with Time TAG TXXXX) -// -//SOMETHING,TXXX , XXXXXXXXX -// // expected to be handled by these //-------------------- //- Measurenet POOLS //-------------------- // -- AIX / VIO-- +var poolsRegexp = regexp.MustCompile(`^POOLS`) + //POOLS,Multiple CPU Pools XXXX,shcpus_in_sys,max_pool_capacity,entitled_pool_capacity,pool_max_time,pool_busy_time,shcpu_tot_time,shcpu_busy_time,Pool_id,entitled //-------------------- //- Measurenet LPAR //-------------------- +var lparRegexp = regexp.MustCompile(`^LPAR`) + //LPAR,Logical Partition XXXX,PhysicalCPU,virtualCPUs,logicalCPUs,poolCPUs,entitled,weight,PoolIdle,usedAllCPU%,usedPoolCPU%,SharedCPU,Capped,EC_User%,EC_Sys%,EC_Wait%,EC_Idle%,VP_User%,VP_Sys%,VP_Wait%,VP_Idle%,Folded,Pool_id //-------------------- //- Measurenet PAGE //-------------------- +var pageRegexp = regexp.MustCompile(`^PAGE`) + //PAGE,Paging XXXX,faults,pgin,pgout,pgsin,pgsout,reclaims,scans,cycles //-------------------- //- Measurenet PROC //-------------------- +var procRegexp = regexp.MustCompile(`^PROC`) + //PROC,Processes XXXX,Runnable,Swap-in,pswitch,syscall,read,write,fork,exec,sem,msg,asleep_bufio,asleep_rawio,asleep_diocio //-------------------- //- Measurenet AIO //-------------------- +var procaioRegexp = regexp.MustCompile(`^PROCAIO`) + //PROCAIO,Asynchronous I/O XXXX,aioprocs,aiorunning,aiocpu //-------------------- //- Measurenet FILE //-------------------- +var fileRegexp = regexp.MustCompile(`^FILE`) + //FILE,File I/O XXXX,iget,namei,dirblk,readch,writech,ttyrawch,ttycanch,ttyoutch //-------------------- //- Measurenet VM //-------------------- +var vmRegexp = regexp.MustCompile(`^VM`) + //VM,Paging and Virtual Memory,nr_dirty,nr_writeback,nr_unstable,nr_page_table_pages,nr_mapped,nr_slab,pgpgin,pgpgout,pswpin,pswpout,pgfree,pgactivate,pgdeactivate,pgfault,pgmajfault,pginodesteal,slabs_scanned,kswapd_steal,kswapd_inodesteal,pageoutrun,allocstall,pgrotated,pgalloc_high,pgalloc_normal,pgalloc_dma,pgrefill_high,pgrefill_normal,pgrefill_dma,pgsteal_high,pgsteal_normal,pgsteal_dma,pgscan_kswapd_high,pgscan_kswapd_normal,pgscan_kswapd_dma,pgscan_direct_high,pgscan_direct_normal,pgscan_direct_dma var genStatsRegexp = regexp.MustCompile(`\W(T\d{4,16})`) +var nfsRegexp = regexp.MustCompile(`^NFS`) +var nameRegexp = regexp.MustCompile(`(\d+)$`) + +var columAsFieldRegexp = regexp.MustCompile(`^POOLS,|^LPAR,|^PAGE,|^PROC,|^PROCAIO,|^FILE,|^VM,`) + +func (nf *NmonFile) processColumnAsField(pa *pointarray.PointArray, Tags map[string]string, t time.Time, lines []string) { + nf.log.Debugf("Processing ColumnAsField stats: %+v", lines) + + for _, line := range lines { + elems := strings.Split(line, nf.Delimiter) + name := elems[0] -func (nf *NmonFile) processGenericStats(pa *pointarray.PointArray, Tags map[string]string, t time.Time, line string) { - nf.log.Debugf("Processing Generic stats: %s", line) + //no need to create new tags - elems := strings.Split(line, nf.Delimiter) - name := elems[0] + fields := make(map[string]interface{}) - //no need to create new tags + for i, value := range elems[2:] { + if len(nf.DataSeries[name].Columns) < i+1 { + nf.log.Warnf("Entry added position %d in serie %s since nmon start: skipped COLUMNS [%#+v] Line [%s]", i+1, name, nf.DataSeries[name], line) + continue + } - fields := make(map[string]interface{}) + // try to convert string to integer + converted, parseErr := strconv.ParseFloat(value, 64) + if parseErr != nil || math.IsNaN(converted) { + nf.log.Warnf("There is some trouble to convert data column (%d) in Line[%s] to float in value [%s] :%s ", i+2, line, value, parseErr) - for i, value := range elems[2:] { - if len(nf.DataSeries[name].Columns) < i+1 { - nf.log.Warnf("Entry added position %d in serie %s since nmon start: skipped COLUMNS [%#+v] Line [%s]", i+1, name, nf.DataSeries[name], line) - continue - } + //if not working, skip to next value. We don't want text values in InfluxDB. + continue + } + column := nf.DataSeries[name].Columns[i] + fields[column] = converted - // try to convert string to integer - converted, parseErr := strconv.ParseFloat(value, 64) - if parseErr != nil || math.IsNaN(converted) { - nf.log.Warnf("There is some trouble to convert data to float in value [%s] :%s ", value, parseErr) - //if not working, skip to next value. We don't want text values in InfluxDB. - continue } - column := nf.DataSeries[name].Columns[i] - fields[column] = converted - } + measurement := "" + if nfsRegexp.MatchString(name) || cpuRegexp.MatchString(name) { + measurement = name + } else { + measurement = nameRegexp.ReplaceAllString(name, "") + } - measurement := "" - if nfsRegexp.MatchString(name) || cpuallRegexp.MatchString(name) { - measurement = name - } else { - measurement = nameRegexp.ReplaceAllString(name, "") + pa.Append(strings.ToLower(measurement), Tags, fields, t) } - pa.Append(measurement, Tags, fields, t) } diff --git a/pkg/config/database.go b/pkg/config/database.go index 075552a..e2ab9f3 100644 --- a/pkg/config/database.go +++ b/pkg/config/database.go @@ -88,6 +88,10 @@ func (dbc *DatabaseCfg) InitDB() { if err = dbc.x.Sync(new(DeviceCfg)); err != nil { log.Fatalf("Fail to sync database DeviceCfg: %v\n", err) } + + if err = dbc.x.Sync(new(NmonFileInfo)); err != nil { + log.Fatalf("Fail to sync database NmonFileStats: %v\n", err) + } } //LoadDbConfig get data from database @@ -103,11 +107,17 @@ func (dbc *DatabaseCfg) LoadDbConfig(cfg *DBConfig) { //Load HMC engines map cfg.HMC, err = dbc.GetHMCCfgMap("") if err != nil { - log.Warningf("Some errors on get Influx Ouput servers URL :%v", err) + log.Warningf("Some errors on get HMC servers URL :%v", err) } //Load Devices map cfg.Devices, err = dbc.GetDeviceCfgMap("") + if err != nil { + log.Warningf("Some errors on get Devices :%v", err) + } + + //Load NmonFile Stats map + cfg.NmonFileInfo, err = dbc.GetNmonFileInfoMap("") if err != nil { log.Warningf("Some errors on get Influx Ouput servers URL :%v", err) } diff --git a/pkg/config/dbconfig.go b/pkg/config/dbconfig.go index 2371c8c..287ba93 100644 --- a/pkg/config/dbconfig.go +++ b/pkg/config/dbconfig.go @@ -88,11 +88,21 @@ type DeviceCfg struct { Description string `xorm:"description"` } +// NmonFileInfo store status of the last read position for each Device/Nmon File +type NmonFileInfo struct { + ID string `xorm:"'id' unique" binding:"Required"` + DeviceName string `xorm:"device_name" binding:"Required"` + FileName string `xorm:"file_name" binding:"Required"` + LastPosition int64 `xorm:"last_position"` + LastTime string `xorm:"last_time"` +} + // DBConfig read from DB type DBConfig struct { - Influxdb map[string]*InfluxCfg - HMC map[string]*HMCCfg - Devices map[string]*DeviceCfg + Influxdb map[string]*InfluxCfg + HMC map[string]*HMCCfg + Devices map[string]*DeviceCfg + NmonFileInfo map[string]*NmonFileInfo } // Init initialices the DB diff --git a/pkg/config/devicecfg.go b/pkg/config/devicecfg.go index 00f3047..1f8a5f7 100644 --- a/pkg/config/devicecfg.go +++ b/pkg/config/devicecfg.go @@ -110,7 +110,7 @@ func (dbc *DatabaseCfg) DelDeviceCfg(id string) (int64, error) { return affected, nil } -// AddOrUpdateIfxDBCfg this method insert data if not previouosly exist the tuple ifxServer.Name or update it if already exist +// AddOrUpdateDeviceCfg this method insert data if not previouosly exist the tuple ifxServer.Name or update it if already exist func (dbc *DatabaseCfg) AddOrUpdateDeviceCfg(dev *DeviceCfg) (int64, error) { log.Debugf("ADD OR UPDATE %+v", dev) //check if exist diff --git a/pkg/config/nmonfileinfo.go b/pkg/config/nmonfileinfo.go new file mode 100644 index 0000000..1a2877c --- /dev/null +++ b/pkg/config/nmonfileinfo.go @@ -0,0 +1,199 @@ +package config + +import "fmt" + +/*************************** + Influx DB backends + -GetNmonFileInfoCfgByID(struct) + -GetNmonFileInfoMap (map - for interna config use + -GetNmonFileInfoArray(Array - for web ui use ) + -AddNmonFileInfo + -DelNmonFileInfo + -UpdateNmonFileInfo + -GetNmonFileInfoAffectOnDel +***********************************/ + +/*GetNmonFileInfoByIDFile get device data by id*/ +func (dbc *DatabaseCfg) GetNmonFileInfoByIDFile(id string, filename string) (NmonFileInfo, error) { + cfgarray, err := dbc.GetNmonFileInfoArray("id='" + id + "' and file_name='" + filename + "'") + if err != nil { + return NmonFileInfo{}, err + } + if len(cfgarray) > 1 { + return NmonFileInfo{}, fmt.Errorf("Error %d results on get NmonFileInfo by id %s", len(cfgarray), id) + } + if len(cfgarray) == 0 { + return NmonFileInfo{}, fmt.Errorf("Error no values have been returned with this id %s in the influx config table", id) + } + return *cfgarray[0], nil +} + +/*GetNmonFileInfoByID get device data by id*/ +func (dbc *DatabaseCfg) GetNmonFileInfoByID(id string) (NmonFileInfo, error) { + cfgarray, err := dbc.GetNmonFileInfoArray("id='" + id + "'") + if err != nil { + return NmonFileInfo{}, err + } + if len(cfgarray) > 1 { + return NmonFileInfo{}, fmt.Errorf("Error %d results on get NmonFileInfo by id %s", len(cfgarray), id) + } + if len(cfgarray) == 0 { + return NmonFileInfo{}, fmt.Errorf("Error no values have been returned with this id %s in the influx config table", id) + } + return *cfgarray[0], nil +} + +/*GetNmonFileInfoMap return data in map format*/ +func (dbc *DatabaseCfg) GetNmonFileInfoMap(filter string) (map[string]*NmonFileInfo, error) { + cfgarray, err := dbc.GetNmonFileInfoArray(filter) + cfgmap := make(map[string]*NmonFileInfo) + for _, val := range cfgarray { + cfgmap[val.ID] = val + log.Debugf("%+v", *val) + } + return cfgmap, err +} + +/*GetNmonFileInfoArray generate an array of devices with all its information */ +func (dbc *DatabaseCfg) GetNmonFileInfoArray(filter string) ([]*NmonFileInfo, error) { + var err error + var devices []*NmonFileInfo + //Get Only data for selected devices + if len(filter) > 0 { + if err = dbc.x.Where(filter).Find(&devices); err != nil { + log.Warnf("Fail to get NmonFileInfo data filteter with %s : %v\n", filter, err) + return nil, err + } + } else { + if err = dbc.x.Find(&devices); err != nil { + log.Warnf("Fail to get influxcfg data: %v\n", err) + return nil, err + } + } + return devices, nil +} + +/*AddNmonFileInfo for adding new devices*/ +func (dbc *DatabaseCfg) AddNmonFileInfo(dev NmonFileInfo) (int64, error) { + var err error + var affected int64 + session := dbc.x.NewSession() + defer session.Close() + + affected, err = session.Insert(dev) + if err != nil { + session.Rollback() + return 0, err + } + //no other relation + err = session.Commit() + if err != nil { + return 0, err + } + log.Infof("Added new influx backend Successfully with id %s ", dev.ID) + dbc.addChanges(affected) + return affected, nil +} + +/*DelNmonFileInfo for deleting influx databases from ID*/ +func (dbc *DatabaseCfg) DelNmonFileInfo(id string) (int64, error) { + var affecteddev, affected int64 + var err error + + session := dbc.x.NewSession() + defer session.Close() + // deleting references in HMCCfg + + affecteddev, err = session.Where("outdb='" + id + "'").Cols("outdb").Update(&HMCCfg{}) + if err != nil { + session.Rollback() + return 0, fmt.Errorf("Error on Delete Device with id on delete HMCCfg with id: %s, error: %s", id, err) + } + + affected, err = session.Where("id='" + id + "'").Delete(&NmonFileInfo{}) + if err != nil { + session.Rollback() + return 0, err + } + + err = session.Commit() + if err != nil { + return 0, err + } + log.Infof("Deleted Successfully influx db with ID %s [ %d Devices Affected ]", id, affecteddev) + dbc.addChanges(affected + affecteddev) + return affected, nil +} + +// AddOrUpdateNmonFileInfo this method insert data if not previouosly exist the tuple ifxServer.Name or update it if already exist +func (dbc *DatabaseCfg) AddOrUpdateNmonFileInfo(dev *NmonFileInfo) (int64, error) { + log.Debugf("ADD OR UPDATE %+v", dev) + //check if exist + m, err := dbc.GetNmonFileInfoArray("id == '" + dev.ID + "'") + if err != nil { + return 0, err + } + switch len(m) { + case 1: + log.Debugf("Updating Device %+v", m) + return dbc.UpdateNmonFileInfo(m[0].ID, *dev) + case 0: + log.Debugf("Adding new Device %+v", dev) + return dbc.AddNmonFileInfo(*dev) + default: + log.Errorf("There is some error when searching for db %+v , found %d", dev, len(m)) + return 0, fmt.Errorf("There is some error when searching for db %+v , found %d", dev, len(m)) + } + +} + +/*UpdateNmonFileInfo for adding new influxdb*/ +func (dbc *DatabaseCfg) UpdateNmonFileInfo(id string, dev NmonFileInfo) (int64, error) { + var affecteddev, affected int64 + var err error + session := dbc.x.NewSession() + defer session.Close() + if id != dev.ID { //ID has been changed + affecteddev, err = session.Where("outdb='" + id + "'").Cols("outdb").Update(&HMCCfg{OutDB: dev.ID}) + if err != nil { + session.Rollback() + return 0, fmt.Errorf("Error on Update InfluxConfig on update id(old) %s with (new): %s, error: %s", id, dev.ID, err) + } + log.Infof("Updated Influx Config to %s devices ", affecteddev) + } + + affected, err = session.Where("id='" + id + "'").UseBool().AllCols().Update(dev) + if err != nil { + session.Rollback() + return 0, err + } + err = session.Commit() + if err != nil { + return 0, err + } + + log.Infof("Updated Influx Config Successfully with id %s and data:%+v, affected", id, dev) + dbc.addChanges(affected + affecteddev) + return affected, nil +} + +/*GetNmonFileInfoAffectOnDel for deleting devices from ID*/ +func (dbc *DatabaseCfg) GetNmonFileInfoAffectOnDel(id string) ([]*DbObjAction, error) { + var devices []*HMCCfg + var obj []*DbObjAction + if err := dbc.x.Where("outdb='" + id + "'").Find(&devices); err != nil { + log.Warnf("Error on Get Outout db id %d for devices , error: %s", id, err) + return nil, err + } + + for _, val := range devices { + obj = append(obj, &DbObjAction{ + Type: "HMCCfg", + TypeDesc: "HMC Devices", + ObID: val.ID, + Action: "Reset InfluxDB Server from HMC Device to 'default' InfluxDB Server", + }) + + } + return obj, nil +} diff --git a/pkg/data/rfile/rfile.go b/pkg/data/rfile/rfile.go index 8f7ee94..1eafb60 100644 --- a/pkg/data/rfile/rfile.go +++ b/pkg/data/rfile/rfile.go @@ -3,6 +3,8 @@ package rfile import ( "bufio" "compress/gzip" + "fmt" + "regexp" "github.com/Sirupsen/logrus" "github.com/pkg/sftp" @@ -63,17 +65,19 @@ func (rf *File) GetRemoteReader() (*RemoteFileReader, error) { return nil, err } reader := bufio.NewReader(gr) - //return &RemoteFileReader{file, bufio.NewReader(reader)}, nil return &RemoteFileReader{file, reader}, nil } reader := bufio.NewReader(file) - //return &RemoteFileReader{file, bufio.NewReader(reader)}, nil return &RemoteFileReader{file, reader}, nil } -//Content returns the nmon files content sorted in an slice of string format -func (rf *File) Content() []string { +//Content returns the nmon files content from the current postion until the last writted line +func (rf *File) Content() ([]string, int64) { + if rf.reader == nil { + rf.log.Warnf("Trying to read data without open reader") + return nil, 0 + } var lines []string for { @@ -83,6 +87,41 @@ func (rf *File) Content() []string { } lines = append(lines, string(line)) } + pos, err := rf.reader.Seek(0, 1) + if err != nil { + rf.log.Warnf("Error on get current remote file position") + return lines, 0 + } + return lines, pos +} + +//ContentUntilMatch returns the nmon files content from the current position until one line match regexp +func (rf *File) ContentUntilMatch(regex *regexp.Regexp) ([]string, int64, error) { + if rf.reader == nil { + return nil, 0, fmt.Errorf("Trying to read data without open reader") + } + + var lines []string + for { + line, _, err := rf.reader.ReadLine() + if err != nil { + break + } + + lines = append(lines, string(line)) + if regex.MatchString(string(line)) { + break + } + } + pos, err := rf.reader.Seek(0, 1) + if err != nil { + rf.log.Warnf("Error on get current remote file position") + return lines, 0, err + } + return lines, pos, nil +} - return lines +// SetPosition set file current position from the beggining +func (rf *File) SetPosition(newpos int64) (int64, error) { + return rf.reader.Seek(newpos, 0) }