Skip to content

Commit

Permalink
Handle stdio is empty when download
Browse files Browse the repository at this point in the history
  • Loading branch information
hustcat committed Nov 23, 2016
1 parent 6fe57d4 commit 4476aca
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 70 deletions.
54 changes: 28 additions & 26 deletions bt/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type idInfo struct {
Count int
}

//the Engine Cloud Torrent engine, backed by anacrolix/torrent
// BtEngine backed by anacrolix/torrent
type BtEngine struct {
mut sync.Mutex

Expand Down Expand Up @@ -128,31 +128,32 @@ func (e *BtEngine) Run() error {
// for StartSeed
e.started = true

if files, err := ioutil.ReadDir(e.dataDir); err == nil {
for _, f := range files {
if filepath.Ext(f.Name()) != ".layer" {
continue
}
ss := strings.Split(f.Name(), ".")
if len(ss) != 2 {
log.Errorf("Found invalid layer file %s", f.Name())
continue
}

id := ss[0]
tf := e.GetTorrentFilePath(id)
if _, err = os.Lstat(tf); err != nil {
continue
}

if err = e.StartSeed(id); err != nil {
log.Errorf("Start seed %s failed: %v", id, err)
}
}
} else {
files, err := ioutil.ReadDir(e.dataDir)
if err != nil {
return err
}

for _, f := range files {
if filepath.Ext(f.Name()) != ".layer" {
continue
}
ss := strings.Split(f.Name(), ".")
if len(ss) != 2 {
log.Errorf("Found invalid layer file %s", f.Name())
continue
}

id := ss[0]
tf := e.GetTorrentFilePath(id)
if _, err = os.Lstat(tf); err != nil {
continue
}

if err = e.StartSeed(id); err != nil {
log.Errorf("Start seed %s failed: %v", id, err)
}
}

return nil
}

Expand Down Expand Up @@ -297,7 +298,8 @@ func (e *BtEngine) StartSeed(id string) error {

tf := e.GetTorrentFilePath(id)
if _, err := os.Lstat(tf); err != nil {
// Torrent file no exist, create it
// Torrent file not exist, create it
log.Debugf("Create torrent file for %s", id)
if err = e.createTorrent(id); err != nil {
return err
}
Expand All @@ -318,9 +320,9 @@ func (e *BtEngine) StartSeed(id string) error {
<-t.tt.GotInfo()
err = e.startTorrent(t.InfoHash)
if err != nil {
log.Errorf("start torrent %v failed: %v", t.InfoHash, err)
log.Errorf("Start torrent %v failed: %v", t.InfoHash, err)
} else {
log.Infof("start torrent %v success", t.InfoHash)
log.Infof("Start torrent %v success", t.InfoHash)
}
}()

Expand Down
82 changes: 38 additions & 44 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,29 +75,36 @@ func (daemon *Daemon) StartDownload(ctx context.Context, r *types.StartDownloadR
context.WithValue(ctx, passwordKey, r.Password)
}

var (
reportWriter io.WriteCloser
err error
)
if r.Stdout != "" {
reportWriter, err = os.OpenFile(r.Stdout, syscall.O_WRONLY, 0)
if err != nil {
return nil, err
}
defer func() {
reportWriter.Close()
}()
}
writeReport := func(f string, a ...interface{}) {
if reportWriter != nil {
fmt.Fprintf(reportWriter, f, a...)
}
}

if daemon.config.BtSeeder {
return daemon.startSeederDownload(ctx, r)
return daemon.startSeederDownload(ctx, r.Source, reportWriter, writeReport)
} else {
return daemon.startLeecherDownload(ctx, r)
return daemon.startLeecherDownload(ctx, r.Source, reportWriter, writeReport)
}
}

func (daemon *Daemon) startSeederDownload(ctx context.Context, r *types.StartDownloadRequest) (*types.StartDownloadResponse, error) {
func (daemon *Daemon) startSeederDownload(ctx context.Context, source string, reportWriter io.Writer, writeReport func(f string, a ...interface{})) (*types.StartDownloadResponse, error) {
sysCtx := daemon.getSystemContext(ctx)

fw, err := os.OpenFile(r.Stdout, syscall.O_WRONLY, 0)
if err != nil {
return nil, err
}
defer func() {
fw.Close()
}()

writeReport := func(f string, a ...interface{}) {
fmt.Fprintf(fw, f, a...)
}

imageSource := r.Source
imageSource := source
if imageSource == "" {
return nil, fmt.Errorf("Image source can't be nil")
}
Expand Down Expand Up @@ -141,7 +148,7 @@ func (daemon *Daemon) startSeederDownload(ctx context.Context, r *types.StartDow
}

writeReport("Copying layer %s\n", layer.Digest)
if err = daemon.copyLayer(ctx, ociImg, src, layer, fw); err != nil {
if err = daemon.copyLayer(ctx, ociImg, src, layer, reportWriter); err != nil {
log.Errorf("Error copy layer %s: %v", layer.Digest, err)
return nil, err
} else {
Expand All @@ -166,11 +173,13 @@ func (daemon *Daemon) copyLayer(ctx context.Context, ociImg *OciImage, src image
}
defer srcStream.Close()

bar := utils.NewProgressBar(int(srcInfo.Size), reportWriter)
bar.Start()
if reportWriter != nil {
bar := utils.NewProgressBar(int(srcInfo.Size), reportWriter)
bar.Start()

srcStream = bar.NewProxyReader(srcStream)
defer fmt.Fprint(reportWriter, "\n")
srcStream = bar.NewProxyReader(srcStream)
defer fmt.Fprint(reportWriter, "\n")
}

digest, _, err := ociImg.layout.PutBlob(ctx, srcStream)
if err != nil {
Expand Down Expand Up @@ -217,32 +226,14 @@ func (daemon *Daemon) startSeedingLayer(ctx context.Context, ociImg *OciImage, d
return nil
}

func (daemon *Daemon) startLeecherDownload(ctx context.Context, r *types.StartDownloadRequest) (*types.StartDownloadResponse, error) {
func (daemon *Daemon) startLeecherDownload(ctx context.Context, source string, reportWriter io.Writer, writeReport func(f string, a ...interface{})) (*types.StartDownloadResponse, error) {
sysCtx := daemon.getSystemContext(ctx)

fw, err := os.OpenFile(r.Stdout, syscall.O_WRONLY, 0)
if err != nil {
return nil, err
}
defer func() {
fw.Close()
}()

writeReport := func(f string, a ...interface{}) {
fmt.Fprintf(fw, f, a...)
}

imageSource := r.Source
imageSource := source
if imageSource == "" {
return nil, fmt.Errorf("Image source cannot be empty")
}

policyContext, err := daemon.getPolicyContext()
if err != nil {
return nil, fmt.Errorf("Error loading trust policy: %v", err)
}
defer policyContext.Destroy()

srcRef, err := transports.ParseImageName(imageSource)
if err != nil {
return nil, fmt.Errorf("Invalid source name %s: %v", imageSource, err)
Expand Down Expand Up @@ -276,7 +267,7 @@ func (daemon *Daemon) startLeecherDownload(ctx context.Context, r *types.StartDo
continue
}

err = daemon.startLeechingLayer(ctx, ociImg, srcRef, layer, writeReport, fw)
err = daemon.startLeechingLayer(ctx, ociImg, srcRef, layer, writeReport, reportWriter)
if err != nil {
// FIXME: download from image source
return nil, err
Expand Down Expand Up @@ -318,7 +309,7 @@ func (daemon *Daemon) startLeecherDownload(ctx context.Context, r *types.StartDo
return &types.StartDownloadResponse{}, nil
}

func (daemon *Daemon) startLeechingLayer(ctx context.Context, ociImg *OciImage, ref imagetypes.ImageReference, layer imagetypes.BlobInfo, writeReport func(f string, a ...interface{}), fw io.Writer) error {
func (daemon *Daemon) startLeechingLayer(ctx context.Context, ociImg *OciImage, ref imagetypes.ImageReference, layer imagetypes.BlobInfo, writeReport func(f string, a ...interface{}), reportWriter io.Writer) error {
id := distdigests.Digest(layer.Digest).Hex()

log.Debugf("Start leeching layer %s", id)
Expand All @@ -329,7 +320,10 @@ func (daemon *Daemon) startLeechingLayer(ctx context.Context, ociImg *OciImage,
return err
}

progress := bt.NewProgressDownload(id, int(layer.Size), fw)
var progress *bt.ProgressDownload
if reportWriter != nil {
progress = bt.NewProgressDownload(id, int(layer.Size), reportWriter)
}
// Download layer file
if err := daemon.btEngine.StartLeecher(id, t, progress); err != nil {
log.Errorf("Download layer %s failed: %v", id, err)
Expand Down

0 comments on commit 4476aca

Please # to comment.