File size: 4,462 Bytes
c19ca42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import os
import cv2
import torch
import numpy as np
import huggingface_hub as hf
from modules import shared, processing, sd_models, devices


REPO_ID = "InstantX/InstantID"
controlnet_model = None
debug = shared.log.trace if os.environ.get('SD_FACE_DEBUG', None) is not None else lambda *args, **kwargs: None


def instant_id(p: processing.StableDiffusionProcessing, app, source_images, strength=1.0, conditioning=0.5, cache=True): # pylint: disable=arguments-differ
    from modules.face.instantid_model import StableDiffusionXLInstantIDPipeline, draw_kps
    from diffusers.models import ControlNetModel
    global controlnet_model # pylint: disable=global-statement

    # prepare pipeline
    if source_images is None or len(source_images) == 0:
        shared.log.warning('InstantID: no input images')
        return None

    c = shared.sd_model.__class__.__name__ if shared.sd_model is not None else ''
    if c != 'StableDiffusionXLPipeline':
        shared.log.warning(f'InstantID invalid base model: current={c} required=StableDiffusionXLPipeline')
        return None

    # prepare face emb
    face_embeds = []
    face_images = []
    for i, source_image in enumerate(source_images):
        faces = app.get(cv2.cvtColor(np.array(source_image), cv2.COLOR_RGB2BGR))
        face = sorted(faces, key=lambda x:(x['bbox'][2]-x['bbox'][0])*x['bbox'][3]-x['bbox'][1])[-1]  # only use the maximum face
        face_embeds.append(torch.from_numpy(face['embedding']))
        face_images.append(draw_kps(source_image, face['kps']))
        p.extra_generation_params[f"InstantID {i+1}"] = f'{faces[0].det_score:.2f} {"female" if faces[0].gender==0 else "male"} {faces[0].age}y'
        shared.log.debug(f'InstantID face: score={face.det_score:.2f} gender={"female" if face.gender==0 else "male"} age={face.age} bbox={face.bbox}')

    shared.log.debug(f'InstantID loading: model={REPO_ID}')
    face_adapter = hf.hf_hub_download(repo_id=REPO_ID, filename="ip-adapter.bin")
    if controlnet_model is None or not cache:
        controlnet_model = ControlNetModel.from_pretrained(REPO_ID, subfolder="ControlNetModel", torch_dtype=devices.dtype, cache_dir=shared.opts.diffusers_dir)
        sd_models.move_model(controlnet_model, devices.device)

    processing.process_init(p)

    # create new pipeline
    orig_pipeline = shared.sd_model # backup current pipeline definition
    shared.sd_model = StableDiffusionXLInstantIDPipeline(
        vae = shared.sd_model.vae,
        text_encoder=shared.sd_model.text_encoder,
        text_encoder_2=shared.sd_model.text_encoder_2,
        tokenizer=shared.sd_model.tokenizer,
        tokenizer_2=shared.sd_model.tokenizer_2,
        unet=shared.sd_model.unet,
        scheduler=shared.sd_model.scheduler,
        controlnet=controlnet_model,
        force_zeros_for_empty_prompt=shared.opts.diffusers_force_zeros,
    )
    sd_models.copy_diffuser_options(shared.sd_model, orig_pipeline) # copy options from original pipeline
    sd_models.set_diffuser_options(shared.sd_model) # set all model options such as fp16, offload, etc.
    shared.sd_model.load_ip_adapter_instantid(face_adapter, scale=strength)
    shared.sd_model.set_ip_adapter_scale(strength)
    sd_models.move_model(shared.sd_model, devices.device) # move pipeline to device
    shared.sd_model.to(dtype=devices.dtype)

    # pipeline specific args
    orig_prompt_attention = shared.opts.prompt_attention
    shared.opts.data['prompt_attention'] = 'Fixed attention' # otherwise need to deal with class_tokens_mask
    p.task_args['image_embeds'] = face_embeds[0].shape # placeholder
    p.task_args['image'] = face_images[0]
    p.task_args['controlnet_conditioning_scale'] = float(conditioning)
    p.task_args['ip_adapter_scale'] = float(strength)
    shared.log.debug(f"InstantID args: {p.task_args}")
    p.task_args['prompt'] = p.all_prompts[0] # override all logic
    p.task_args['negative_prompt'] = p.all_negative_prompts[0]
    p.task_args['image_embeds'] = face_embeds[0] # overwrite placeholder

    # run processing
    processed: processing.Processed = processing.process_images(p)
    shared.sd_model.set_ip_adapter_scale(0)
    p.extra_generation_params['InstantID'] = f'{strength}/{conditioning}'

    if not cache:
        controlnet_model = None
        devices.torch_gc()

    # restore original pipeline
    shared.opts.data['prompt_attention'] = orig_prompt_attention
    shared.sd_model = orig_pipeline
    return processed