File size: 4,413 Bytes
c19ca42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python
import os
import io
import pathlib
import argparse
import filetype
import numpy as np
from imwatermark import WatermarkEncoder, WatermarkDecoder
from PIL import Image
from PIL.ExifTags import TAGS
from PIL.TiffImagePlugin import ImageFileDirectory_v2
from util import log, Map
import piexif
import piexif.helper


options = Map({ 'method': 'dwtDctSvd', 'type': 'bytes' })


def get_exif(image):
    # using piexif
    res1 = {}
    try:
        exif = piexif.load(image.info["exif"])
        exif = exif.get("Exif", {})
        for k, v in exif.items():
            key = list(vars(piexif.ExifIFD).keys())[list(vars(piexif.ExifIFD).values()).index(k)]
            res1[key] = piexif.helper.UserComment.load(v)
    except Exception:
        pass
    # using pillow
    res2 = {}
    try:
        res2 = { TAGS[k]: v for k, v in image.getexif().items() if k in TAGS }
    except Exception:
        pass
    return {**res1, **res2}


def set_exif(d: dict):
    ifd = ImageFileDirectory_v2()
    _TAGS = {v: k for k, v in TAGS.items()} # enumerate possible exif tags
    for k, v in d.items():
        ifd[_TAGS[k]] = v
    exif_stream = io.BytesIO()
    ifd.save(exif_stream)
    encoded = b'Exif\x00\x00' + exif_stream.getvalue()
    return encoded


def get_watermark(image, params):
    data = np.asarray(image)
    decoder = WatermarkDecoder(options.type, params.length)
    decoded = decoder.decode(data, options.method)
    wm = decoded.decode(encoding='ascii', errors='ignore')
    return wm


def set_watermark(image, params):
    data = np.asarray(image)
    encoder = WatermarkEncoder()
    length = params.length // 8
    text = f"{params.wm:<{length}}"[:length]
    bytearr = text.encode(encoding='ascii', errors='ignore')
    encoder.set_watermark(options.type, bytearr)
    encoded = encoder.encode(data, options.method)
    image = Image.fromarray(encoded)
    return image


def watermark(params, file):
    if not os.path.exists(file):
        log.error({ 'watermark': 'file not found' })
        return
    if not filetype.is_image(file):
        log.error({ 'watermark': 'file is not an image' })
        return
    image = Image.open(file)
    if image.width * image.height < 256 * 256:
        log.error({ 'watermark': 'image too small' })
        return

    exif = get_exif(image)

    if params.command == 'read':
        fn = params.input
        wm = get_watermark(image, params)

    elif params.command == 'write':
        metadata = b'' if params.strip else set_exif(exif)
        if params.output != '':
            pathlib.Path(params.output).mkdir(parents = True, exist_ok = True)
        image=set_watermark(image, params)
        fn = os.path.join(params.output, file)
        image.save(fn, exif=metadata)

        if params.verify:
            image = Image.open(fn)
            data = np.asarray(image)
            decoder = WatermarkDecoder(options.type, params.length)
            decoded = decoder.decode(data, options.method)
            wm = decoded.decode(encoding='ascii', errors='ignore')
        else:
            wm = params.wm

    log.info({ 'file': fn })
    log.info({ 'resolution': f'{image.width}x{image.height}' })
    log.info({ 'watermark': wm })
    log.info({ 'exif': None if params.strip else exif })


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description = 'image watermarking')
    parser.add_argument('command', choices = ['read', 'write'])
    parser.add_argument('--wm', type=str, required=False, default='sdnext', help='watermark string')
    parser.add_argument('--strip', default=False, action='store_true', help = "strip existing exif data")
    parser.add_argument('--verify', default=False, action='store_true', help = "verify watermark during write")
    parser.add_argument('--length', type=int, default=32, help="watermark length in bits")
    parser.add_argument('--output', type=str, required=False, default='', help='folder to store images, default is overwrite in-place')
    parser.add_argument('input', type=str, nargs='*')
    args = parser.parse_args()
    # log.info({ 'watermark args': vars(args), 'options': options })
    for arg in args.input:
        if os.path.isfile(arg):
            watermark(args, arg)
        elif os.path.isdir(arg):
            for root, _dirs, files in os.walk(arg):
                for f in files:
                    watermark(args, os.path.join(root, f))