|
package fs |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"net/http" |
|
stdpath "path" |
|
|
|
"github.com/alist-org/alist/v3/internal/conf" |
|
"github.com/alist-org/alist/v3/internal/driver" |
|
"github.com/alist-org/alist/v3/internal/model" |
|
"github.com/alist-org/alist/v3/internal/op" |
|
"github.com/alist-org/alist/v3/internal/setting" |
|
"github.com/alist-org/alist/v3/internal/stream" |
|
"github.com/alist-org/alist/v3/pkg/tache" |
|
"github.com/alist-org/alist/v3/pkg/utils" |
|
"github.com/pkg/errors" |
|
log "github.com/sirupsen/logrus" |
|
) |
|
|
|
type CopyTask struct { |
|
tache.Base |
|
Status string `json:"-"` |
|
SrcObjPath string `json:"src_path"` |
|
DstDirPath string `json:"dst_path"` |
|
Override bool `json:"override"` |
|
srcStorage driver.Driver `json:"-"` |
|
dstStorage driver.Driver `json:"-"` |
|
SrcStorageMp string `json:"src_storage_mp"` |
|
DstStorageMp string `json:"dst_storage_mp"` |
|
Size int64 `json:"size"` |
|
} |
|
|
|
func (t *CopyTask) GetName() string { |
|
return fmt.Sprintf("copy [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath) |
|
} |
|
|
|
func (t *CopyTask) GetStatus() string { |
|
return t.Status |
|
} |
|
|
|
func (t *CopyTask) SetSize(size int64) { |
|
t.Size = size |
|
} |
|
|
|
func (t *CopyTask) GetSize() int64 { |
|
return t.Size |
|
} |
|
|
|
func (t *CopyTask) OnFailed() { |
|
result := fmt.Sprintf("%s:%s", t.GetName(), t.GetErr()) |
|
log.Debug(result) |
|
if setting.GetBool(conf.NotifyEnabled) && setting.GetBool(conf.NotifyOnCopyFailed) { |
|
go op.Notify("文件复制结果", result) |
|
} |
|
} |
|
|
|
func (t *CopyTask) OnSucceeded() { |
|
result := fmt.Sprintf("复制%s到%s成功", t.SrcObjPath, t.DstDirPath) |
|
log.Debug(result) |
|
if setting.GetBool(conf.NotifyEnabled) && setting.GetBool(conf.NotifyOnCopySucceeded) { |
|
go op.Notify("文件复制结果", result) |
|
} |
|
} |
|
|
|
func humanReadableSize(size int64) string { |
|
const unit = 1024 |
|
if size < unit { |
|
return fmt.Sprintf("%d B", size) |
|
} |
|
div, exp := int64(unit), 0 |
|
for n := size / unit; n >= unit; n /= unit { |
|
div *= unit |
|
exp++ |
|
} |
|
return fmt.Sprintf("%.1f %cB", float64(size)/float64(div), "KMGTPE"[exp]) |
|
} |
|
|
|
func (t *CopyTask) Run() error { |
|
|
|
var err error |
|
if t.srcStorage == nil { |
|
t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp) |
|
} |
|
if t.dstStorage == nil { |
|
t.dstStorage, err = op.GetStorageByMountPath(t.DstStorageMp) |
|
} |
|
if err != nil { |
|
return errors.WithMessage(err, "failed get storage") |
|
} |
|
|
|
if !t.Override { |
|
srcObj, err := get(context.Background(), t.SrcStorageMp+t.SrcObjPath) |
|
if err != nil { |
|
return errors.WithMessagef(err, "failed get src [%s] file", t.SrcObjPath) |
|
} |
|
if srcObj.IsDir() { |
|
return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath) |
|
} |
|
var distSize int64 |
|
t.Size = srcObj.GetSize() |
|
dst_path := stdpath.Join(t.DstStorageMp+t.DstDirPath, srcObj.GetName()) |
|
obj, err := get(context.Background(), dst_path) |
|
if err == nil { |
|
distSize = obj.GetSize() |
|
} |
|
if err != nil || distSize != t.Size { |
|
|
|
return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath) |
|
} else { |
|
|
|
return errors.WithMessage(err, obj.GetName()+"文件已经存在") |
|
} |
|
} else { |
|
return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath) |
|
} |
|
|
|
} |
|
|
|
var CopyTaskManager *tache.Manager[*CopyTask] |
|
|
|
|
|
|
|
func _copy(ctx context.Context, SrcObjPath, DstDirPath string, overwrite bool, lazyCache ...bool) (tache.TaskWithInfo, error) { |
|
srcStorage, srcObjActualPath, err := op.GetStorageAndActualPath(SrcObjPath) |
|
if err != nil { |
|
return nil, errors.WithMessage(err, "failed get src storage") |
|
} |
|
dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(DstDirPath) |
|
if err != nil { |
|
return nil, errors.WithMessage(err, "failed get dst storage") |
|
} |
|
|
|
|
|
if srcStorage.GetStorage() == dstStorage.GetStorage() { |
|
return nil, op.Copy(ctx, srcStorage, srcObjActualPath, dstDirActualPath, lazyCache...) |
|
} |
|
if ctx.Value(conf.NoTaskKey) != nil { |
|
srcObj, err := op.Get(ctx, srcStorage, srcObjActualPath) |
|
if err != nil { |
|
return nil, errors.WithMessagef(err, "failed get src [%s] file", SrcObjPath) |
|
} |
|
if !srcObj.IsDir() { |
|
|
|
link, _, err := op.Link(ctx, srcStorage, srcObjActualPath, model.LinkArgs{ |
|
Header: http.Header{}, |
|
}) |
|
if err != nil { |
|
return nil, errors.WithMessagef(err, "failed get [%s] link", SrcObjPath) |
|
} |
|
fs := stream.FileStream{ |
|
Obj: srcObj, |
|
Ctx: ctx, |
|
} |
|
|
|
ss, err := stream.NewSeekableStream(fs, link) |
|
if err != nil { |
|
return nil, errors.WithMessagef(err, "failed get [%s] stream", SrcObjPath) |
|
} |
|
return nil, op.Put(ctx, dstStorage, dstDirActualPath, ss, nil, false) |
|
} |
|
} |
|
|
|
|
|
t := &CopyTask{ |
|
srcStorage: srcStorage, |
|
dstStorage: dstStorage, |
|
SrcObjPath: srcObjActualPath, |
|
DstDirPath: dstDirActualPath, |
|
Override: overwrite, |
|
SrcStorageMp: srcStorage.GetStorage().MountPath, |
|
DstStorageMp: dstStorage.GetStorage().MountPath, |
|
} |
|
CopyTaskManager.Add(t) |
|
return t, nil |
|
} |
|
|
|
func copyBetween2Storages(t *CopyTask, srcStorage, dstStorage driver.Driver, SrcObjPath, DstDirPath string) error { |
|
t.Status = "getting src object" |
|
srcObj, err := op.Get(t.Ctx(), srcStorage, SrcObjPath) |
|
if err != nil { |
|
return errors.WithMessagef(err, "failed get src [%s] file", SrcObjPath) |
|
} |
|
if srcObj.IsDir() { |
|
t.Status = "src object is dir, listing objs" |
|
objs, err := op.List(t.Ctx(), srcStorage, SrcObjPath, model.ListArgs{}) |
|
if err != nil { |
|
return errors.WithMessagef(err, "failed list src [%s] objs", SrcObjPath) |
|
} |
|
for _, obj := range objs { |
|
if utils.IsCanceled(t.Ctx()) { |
|
return nil |
|
} |
|
SrcObjPath := stdpath.Join(SrcObjPath, obj.GetName()) |
|
dstObjPath := stdpath.Join(DstDirPath, srcObj.GetName()) |
|
CopyTaskManager.Add(&CopyTask{ |
|
srcStorage: srcStorage, |
|
dstStorage: dstStorage, |
|
SrcObjPath: SrcObjPath, |
|
DstDirPath: dstObjPath, |
|
Override: t.Override, |
|
SrcStorageMp: srcStorage.GetStorage().MountPath, |
|
DstStorageMp: dstStorage.GetStorage().MountPath, |
|
}) |
|
} |
|
t.Status = "src object is dir, added all copy tasks of objs" |
|
return nil |
|
} |
|
return copyFileBetween2Storages(t, srcStorage, dstStorage, SrcObjPath, DstDirPath) |
|
} |
|
|
|
func copyFileBetween2Storages(tsk *CopyTask, srcStorage, dstStorage driver.Driver, srcFilePath, DstDirPath string) error { |
|
tsk.Status = fmt.Sprintf("getting src object (%s)", humanReadableSize(tsk.Size)) |
|
srcFile, err := op.Get(tsk.Ctx(), srcStorage, srcFilePath) |
|
if err != nil { |
|
return errors.WithMessagef(err, "failed get src [%s] file", srcFilePath) |
|
} |
|
link, _, err := op.Link(tsk.Ctx(), srcStorage, srcFilePath, model.LinkArgs{ |
|
Header: http.Header{}, |
|
}) |
|
if err != nil { |
|
return errors.WithMessagef(err, "failed get [%s] link", srcFilePath) |
|
} |
|
fs := stream.FileStream{ |
|
Obj: srcFile, |
|
Ctx: tsk.Ctx(), |
|
} |
|
|
|
ss, err := stream.NewSeekableStream(fs, link) |
|
if err != nil { |
|
return errors.WithMessagef(err, "failed get [%s] stream", srcFilePath) |
|
} |
|
return op.Put(tsk.Ctx(), dstStorage, DstDirPath, ss, tsk.SetProgress, true) |
|
} |
|
|