File size: 7,360 Bytes
7107f0b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
package fs
import (
"context"
"fmt"
"net/http"
stdpath "path"
"github.com/alist-org/alist/v3/internal/conf"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/internal/setting"
"github.com/alist-org/alist/v3/internal/stream"
"github.com/alist-org/alist/v3/pkg/tache"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
type CopyTask struct {
tache.Base
Status string `json:"-"` //don't save status to save space
SrcObjPath string `json:"src_path"`
DstDirPath string `json:"dst_path"`
Override bool `json:"override"`
srcStorage driver.Driver `json:"-"`
dstStorage driver.Driver `json:"-"`
SrcStorageMp string `json:"src_storage_mp"`
DstStorageMp string `json:"dst_storage_mp"`
Size int64 `json:"size"`
}
func (t *CopyTask) GetName() string {
return fmt.Sprintf("copy [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath)
}
func (t *CopyTask) GetStatus() string {
return t.Status
}
func (t *CopyTask) SetSize(size int64) {
t.Size = size
}
func (t *CopyTask) GetSize() int64 {
return t.Size
}
func (t *CopyTask) OnFailed() {
result := fmt.Sprintf("%s:%s", t.GetName(), t.GetErr())
log.Debug(result)
if setting.GetBool(conf.NotifyEnabled) && setting.GetBool(conf.NotifyOnCopyFailed) {
go op.Notify("文件复制结果", result)
}
}
func (t *CopyTask) OnSucceeded() {
result := fmt.Sprintf("复制%s到%s成功", t.SrcObjPath, t.DstDirPath)
log.Debug(result)
if setting.GetBool(conf.NotifyEnabled) && setting.GetBool(conf.NotifyOnCopySucceeded) {
go op.Notify("文件复制结果", result)
}
}
func humanReadableSize(size int64) string {
const unit = 1024
if size < unit {
return fmt.Sprintf("%d B", size)
}
div, exp := int64(unit), 0
for n := size / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %cB", float64(size)/float64(div), "KMGTPE"[exp])
}
func (t *CopyTask) Run() error {
var err error
if t.srcStorage == nil {
t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
}
if t.dstStorage == nil {
t.dstStorage, err = op.GetStorageByMountPath(t.DstStorageMp)
}
if err != nil {
return errors.WithMessage(err, "failed get storage")
}
if !t.Override {
srcObj, err := get(context.Background(), t.SrcStorageMp+t.SrcObjPath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", t.SrcObjPath)
}
if srcObj.IsDir() {
return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath)
}
var distSize int64
t.Size = srcObj.GetSize()
dst_path := stdpath.Join(t.DstStorageMp+t.DstDirPath, srcObj.GetName())
obj, err := get(context.Background(), dst_path)
if err == nil {
distSize = obj.GetSize()
}
if err != nil || distSize != t.Size {
//文件不存在或者大小不一样,直接复制
return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath)
} else {
//文件已经存在,直接返回完成
return errors.WithMessage(err, obj.GetName()+"文件已经存在")
}
} else {
return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath)
}
}
var CopyTaskManager *tache.Manager[*CopyTask]
// Copy if in the same storage, call move method
// if not, add copy task
func _copy(ctx context.Context, SrcObjPath, DstDirPath string, overwrite bool, lazyCache ...bool) (tache.TaskWithInfo, error) {
srcStorage, srcObjActualPath, err := op.GetStorageAndActualPath(SrcObjPath)
if err != nil {
return nil, errors.WithMessage(err, "failed get src storage")
}
dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(DstDirPath)
if err != nil {
return nil, errors.WithMessage(err, "failed get dst storage")
}
// copy if in the same storage, just call driver.Copy
if srcStorage.GetStorage() == dstStorage.GetStorage() {
return nil, op.Copy(ctx, srcStorage, srcObjActualPath, dstDirActualPath, lazyCache...)
}
if ctx.Value(conf.NoTaskKey) != nil {
srcObj, err := op.Get(ctx, srcStorage, srcObjActualPath)
if err != nil {
return nil, errors.WithMessagef(err, "failed get src [%s] file", SrcObjPath)
}
if !srcObj.IsDir() {
// copy file directly
link, _, err := op.Link(ctx, srcStorage, srcObjActualPath, model.LinkArgs{
Header: http.Header{},
})
if err != nil {
return nil, errors.WithMessagef(err, "failed get [%s] link", SrcObjPath)
}
fs := stream.FileStream{
Obj: srcObj,
Ctx: ctx,
}
// any link provided is seekable
ss, err := stream.NewSeekableStream(fs, link)
if err != nil {
return nil, errors.WithMessagef(err, "failed get [%s] stream", SrcObjPath)
}
return nil, op.Put(ctx, dstStorage, dstDirActualPath, ss, nil, false)
}
}
// not in the same storage
t := &CopyTask{
srcStorage: srcStorage,
dstStorage: dstStorage,
SrcObjPath: srcObjActualPath,
DstDirPath: dstDirActualPath,
Override: overwrite,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
}
CopyTaskManager.Add(t)
return t, nil
}
func copyBetween2Storages(t *CopyTask, srcStorage, dstStorage driver.Driver, SrcObjPath, DstDirPath string) error {
t.Status = "getting src object"
srcObj, err := op.Get(t.Ctx(), srcStorage, SrcObjPath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", SrcObjPath)
}
if srcObj.IsDir() {
t.Status = "src object is dir, listing objs"
objs, err := op.List(t.Ctx(), srcStorage, SrcObjPath, model.ListArgs{})
if err != nil {
return errors.WithMessagef(err, "failed list src [%s] objs", SrcObjPath)
}
for _, obj := range objs {
if utils.IsCanceled(t.Ctx()) {
return nil
}
SrcObjPath := stdpath.Join(SrcObjPath, obj.GetName())
dstObjPath := stdpath.Join(DstDirPath, srcObj.GetName())
CopyTaskManager.Add(&CopyTask{
srcStorage: srcStorage,
dstStorage: dstStorage,
SrcObjPath: SrcObjPath,
DstDirPath: dstObjPath,
Override: t.Override,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
})
}
t.Status = "src object is dir, added all copy tasks of objs"
return nil
}
return copyFileBetween2Storages(t, srcStorage, dstStorage, SrcObjPath, DstDirPath)
}
func copyFileBetween2Storages(tsk *CopyTask, srcStorage, dstStorage driver.Driver, srcFilePath, DstDirPath string) error {
tsk.Status = fmt.Sprintf("getting src object (%s)", humanReadableSize(tsk.Size))
srcFile, err := op.Get(tsk.Ctx(), srcStorage, srcFilePath)
if err != nil {
return errors.WithMessagef(err, "failed get src [%s] file", srcFilePath)
}
link, _, err := op.Link(tsk.Ctx(), srcStorage, srcFilePath, model.LinkArgs{
Header: http.Header{},
})
if err != nil {
return errors.WithMessagef(err, "failed get [%s] link", srcFilePath)
}
fs := stream.FileStream{
Obj: srcFile,
Ctx: tsk.Ctx(),
}
// any link provided is seekable
ss, err := stream.NewSeekableStream(fs, link)
if err != nil {
return errors.WithMessagef(err, "failed get [%s] stream", srcFilePath)
}
return op.Put(tsk.Ctx(), dstStorage, DstDirPath, ss, tsk.SetProgress, true)
}
|