Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix cdn back source range size overflow #550

Merged
merged 1 commit into from
Aug 15, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 27 additions & 24 deletions cdnsystem/daemon/cdn/cache_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,23 @@ func (cw *cacheWriter) startWriter(reader io.Reader, task *types.SeedTask, detec
curPieceNum := len(detectResult.pieceMetaRecords)
routineCount := calculateRoutineCount(task.SourceFileLength-currentSourceFileLength, task.PieceSize)
// start writer pool
backSourceFileLength, totalPieceCount, err := cw.doWrite(reader, task, routineCount, curPieceNum)
backSourceLength, totalPieceCount, err := cw.doWrite(reader, task, routineCount, curPieceNum)
if err != nil {
return &downloadMetadata{backSourceLength: backSourceFileLength}, fmt.Errorf("write data: %v", err)
return &downloadMetadata{backSourceLength: backSourceLength}, fmt.Errorf("write data: %v", err)
}
storageInfo, err := cw.cacheDataManager.statDownloadFile(task.TaskID)
if err != nil {
return &downloadMetadata{backSourceLength: backSourceFileLength}, fmt.Errorf("stat cdn download file: %v", err)
return &downloadMetadata{backSourceLength: backSourceLength}, fmt.Errorf("stat cdn download file: %v", err)
}
// TODO Try getting it from the ProgressManager first
pieceMd5Sign, _, err := cw.cacheDataManager.getPieceMd5Sign(task.TaskID)
if err != nil {
return &downloadMetadata{backSourceLength: backSourceFileLength}, fmt.Errorf("get piece md5 sign: %v", err)
return &downloadMetadata{backSourceLength: backSourceLength}, fmt.Errorf("get piece md5 sign: %v", err)
}
return &downloadMetadata{
backSourceLength: backSourceFileLength,
backSourceLength: backSourceLength,
realCdnFileLength: storageInfo.Size,
realSourceFileLength: currentSourceFileLength + backSourceFileLength,
realSourceFileLength: currentSourceFileLength + backSourceLength,
pieceTotalCount: int32(totalPieceCount),
pieceMd5Sign: pieceMd5Sign,
}, nil
Expand All @@ -97,7 +97,7 @@ func (cw *cacheWriter) doWrite(reader io.Reader, task *types.SeedTask, routineCo
return new(bytes.Buffer)
},
}
var backSourceFileLength int64
var backSourceLength int64
buf := make([]byte, 256*1024)
jobCh := make(chan *piece)
var wg = &sync.WaitGroup{}
Expand All @@ -106,15 +106,15 @@ func (cw *cacheWriter) doWrite(reader io.Reader, task *types.SeedTask, routineCo
var bb = bufPool.Get().(*bytes.Buffer)
bb.Reset()
limitReader := io.LimitReader(reader, int64(task.PieceSize))
n, err := io.CopyBuffer(bb, limitReader, buf)
n, err = io.CopyBuffer(bb, limitReader, buf)
if err != nil {
close(jobCh)
return backSourceFileLength, 0, fmt.Errorf("read source taskID %s pieceNum %d piece: %v", task.TaskID, curPieceNum, err)
return backSourceLength, 0, fmt.Errorf("read source taskID %s pieceNum %d piece: %v", task.TaskID, curPieceNum, err)
}
if n == 0 {
break
}
backSourceFileLength = backSourceFileLength + n
backSourceLength += n

jobCh <- &piece{
taskID: task.TaskID,
Expand All @@ -129,53 +129,56 @@ func (cw *cacheWriter) doWrite(reader io.Reader, task *types.SeedTask, routineCo
}
close(jobCh)
wg.Wait()
return backSourceFileLength, curPieceNum, nil
return backSourceLength, curPieceNum, nil
}

func (cw *cacheWriter) writerPool(wg *sync.WaitGroup, routineCount int, pieceCh chan *piece, bufPool *sync.Pool) {
wg.Add(routineCount)
for i := 0; i < routineCount; i++ {
go func() {
defer wg.Done()
for piece := range pieceCh {
for p := range pieceCh {
// TODO Subsequent compression and other features are implemented through waitToWriteContent and pieceStyle
waitToWriteContent := piece.pieceContent
waitToWriteContent := p.pieceContent
originPieceLen := waitToWriteContent.Len() // the length of the original data that has not been processed
pieceLen := originPieceLen // the real length written to the storage medium after processing
pieceStyle := types.PlainUnspecified
pieceMd5 := md5.New()
err := cw.cacheDataManager.writeDownloadFile(piece.taskID, int64(piece.pieceNum)*int64(piece.pieceSize), int64(waitToWriteContent.Len()),
io.TeeReader(io.LimitReader(piece.pieceContent, int64(waitToWriteContent.Len())), pieceMd5))
err := cw.cacheDataManager.writeDownloadFile(
p.taskID, int64(p.pieceNum)*int64(p.pieceSize), int64(waitToWriteContent.Len()),
io.TeeReader(io.LimitReader(p.pieceContent, int64(waitToWriteContent.Len())), pieceMd5))
// Recycle Buffer
bufPool.Put(waitToWriteContent)
if err != nil {
logger.Errorf("write taskID %s pieceNum %d file: %v", piece.taskID, piece.pieceNum, err)
logger.Errorf("write taskID %s pieceNum %d file: %v", p.taskID, p.pieceNum, err)
continue
}
start := uint64(p.pieceNum) * uint64(p.pieceSize)
end := start + uint64(pieceLen) - 1
pieceRecord := &storage.PieceMetaRecord{
PieceNum: piece.pieceNum,
PieceNum: p.pieceNum,
PieceLen: int32(pieceLen),
Md5: digestutils.ToHashString(pieceMd5),
Range: &rangeutils.Range{
StartIndex: uint64(piece.pieceNum * piece.pieceSize),
EndIndex: uint64(piece.pieceNum*piece.pieceSize + int32(pieceLen) - 1),
StartIndex: start,
EndIndex: end,
},
OriginRange: &rangeutils.Range{
StartIndex: uint64(piece.pieceNum * piece.pieceSize),
EndIndex: uint64(piece.pieceNum*piece.pieceSize + int32(originPieceLen) - 1),
StartIndex: start,
EndIndex: end,
},
PieceStyle: pieceStyle,
}
// write piece meta to storage
if err := cw.cacheDataManager.appendPieceMetaData(piece.taskID, pieceRecord); err != nil {
if err = cw.cacheDataManager.appendPieceMetaData(p.taskID, pieceRecord); err != nil {
logger.Errorf("write piece meta file: %v", err)
continue
}

if cw.cdnReporter != nil {
if err := cw.cdnReporter.reportPieceMetaRecord(piece.taskID, pieceRecord, DownloaderReport); err != nil {
if err = cw.cdnReporter.reportPieceMetaRecord(p.taskID, pieceRecord, DownloaderReport); err != nil {
// NOTE: should we do this job again?
logger.Errorf("report piece status, pieceNum %d pieceMetaRecord %s: %v", piece.pieceNum, pieceRecord, err)
logger.Errorf("report piece status, pieceNum %d pieceMetaRecord %s: %v", p.pieceNum, pieceRecord, err)
}
}
}
Expand Down