Spaces:
Runtime error
Runtime error
from Crypto.PublicKey import RSA | |
from Crypto.Random import get_random_bytes | |
from Crypto.Cipher import AES, PKCS1_OAEP | |
from Crypto.Hash import RIPEMD160, SHA256 | |
from Crypto.Signature import pkcs1_15 | |
import binascii | |
import base58 | |
import base64 | |
import cv2 | |
import numpy as np | |
import os | |
import uuid as uniq | |
import qrcode as qr | |
import math | |
from PIL import ImageFont, ImageDraw, Image | |
def process(img,font_text,font_fac,font_x,font_y,font_col,font_op): | |
img.save('tmp.png') | |
img = Image.open('tmp.png').convert("RGBA") | |
im=img | |
#im=Image.open(img) | |
txt = Image.new('RGBA', im.size, (255,255,255,0)) | |
w, h = im.size | |
print (f'FONT COLOR: {font_col}') | |
#t_fill = ("#"+"E6"+f"{font_col.strip('#')}") | |
#t_fill = (font_col) | |
h1 = font_col.strip("#") | |
rgb_tup = tuple(int(h1[i:i+2], 16) for i in (0, 2, 4)) | |
print (rgb_tup) | |
a,b,c = rgb_tup | |
t_fill = (a,b,c,font_op) | |
print (f'FONT COLOR: {t_fill}') | |
#x = int(w-font_x) | |
#y = int(h-font_y) | |
x = int(font_x) | |
y = int(font_y) | |
draw = ImageDraw.Draw(txt) | |
text = f'{font_text}' | |
font_size=font_fac | |
font = ImageFont.truetype("./fonts/SansitaOne.ttf", int(font_size)) | |
size = font.getsize(text) | |
draw.text((x-size[0]/2, y),text, font = font, fill=t_fill) | |
#txt.putalpha(128) | |
combined = Image.alpha_composite(im, txt) | |
return combined | |
def textover(im,txt1="",txt2=""): | |
#im.save('tmp.png') | |
im = Image.open(im) | |
inp=1 | |
hh=0 | |
hhh=25 | |
#cnt = len(inp) | |
cnt = inp | |
font_a = 30 | |
font_b = 10 | |
if cnt >0: | |
font_a = font_a + (cnt * 2) | |
font_b = font_b + (cnt * 2) | |
#hh = hh-int(cnt/2) | |
hhh = hhh+int(cnt/2) | |
w,h = im.size | |
print (w) | |
print (h) | |
font_x = (w/2) | |
font_y = h-hhh | |
out = process(im,txt1,font_fac=font_a,font_x=font_x,font_y=hh,font_col="#000000",font_op=255) | |
out = process(out,txt2,font_fac=font_b,font_x=font_x,font_y=font_y,font_col="#000000",font_op=255) | |
#out.save("out.png") | |
return out | |
############### qr ###################### | |
def make_qr(txt=None,data=None,im_size=None,color_f=None,color_b=None): | |
qrm = qr.QRCode(box_size=10,error_correction=qr.constants.ERROR_CORRECT_H) | |
if color_f == None: | |
color_f = "#000" | |
if color_b == None: | |
color_b = "#fff" | |
if txt != None and txt != "" and data != None: | |
f = Image.open(f'{data}') | |
f.thumbnail((im_size,im_size)) | |
f.save("tmp.jpg") | |
imr = open(f'tmp.jpg','rb') | |
out = f'{txt}+++{base64.b64encode(imr.read())}' | |
print (f'txt+data {out}') | |
qrm.add_data(out) | |
qrm.make(fit=True) | |
img1 = qrm.make_image(fill_color=color_f, back_color=color_b) | |
img1.save("im.png") | |
return "im.png" | |
if txt == None or txt == "" and data != None: | |
f = Image.open(f'{data}') | |
f.thumbnail((im_size,im_size)) | |
f.save("tmp1.jpg") | |
imr = open(f'tmp1.jpg','rb') | |
out = f'+++{base64.b64encode(imr.read())}' | |
print (f'data {out}') | |
qrm.add_data(out) | |
qrm.make(fit=True) | |
img1 = qrm.make_image(fill_color=color_f, back_color=color_b) | |
img1.save("im1.png") | |
return "im1.png" | |
if txt != None and txt != "" and data == None: | |
out = f'{txt}' | |
print (f'txt {out}') | |
qrm.add_data(out) | |
qrm.make(fit=True) | |
img1 = qrm.make_image(fill_color=color_f, back_color=color_b) | |
img1.save("im2.png") | |
return "im2.png" | |
############ stegan #################### | |
def to_bin(data): | |
"""Convert `data` to binary format as string""" | |
if isinstance(data, str): | |
return ''.join([ format(ord(i), "08b") for i in data ]) | |
elif isinstance(data, bytes): | |
return ''.join([ format(i, "08b") for i in data ]) | |
elif isinstance(data, np.ndarray): | |
return [ format(i, "08b") for i in data ] | |
elif isinstance(data, int) or isinstance(data, np.uint8): | |
return format(data, "08b") | |
else: | |
raise TypeError("Type not supported.") | |
def decode(image_name,txt=None): | |
BGRimage = cv2.imread(image_name) | |
image = cv2.cvtColor(BGRimage, cv2.COLOR_BGR2RGB) | |
binary_data = "" | |
for row in image: | |
for pixel in row: | |
r, g, b = to_bin(pixel) | |
binary_data += r[-1] | |
binary_data += g[-1] | |
binary_data += b[-1] | |
all_bytes = [ binary_data[i: i+8] for i in range(0, len(binary_data), 8) ] | |
decoded_data = "" | |
for byte in all_bytes: | |
decoded_data += chr(int(byte, 2)) | |
if decoded_data[-5:] == "=====": | |
break | |
this = decoded_data[:-5].split("#####",1)[0] | |
this = eval(this) | |
enc_in=this | |
return this | |
def encode(image_name, secret_data,txt=None): | |
BGRimage = cv2.imread(image_name) | |
image = cv2.cvtColor(BGRimage, cv2.COLOR_BGR2RGB) | |
n_bytes = image.shape[0] * image.shape[1] * 3 // 8 | |
print("[*] Maximum bytes to encode:", n_bytes) | |
secret_data1=secret_data | |
#secret_data1=f'{secret_data}#{resultp}' | |
while True: | |
if len(secret_data1)+5 < (n_bytes): | |
secret_data1 = f'{secret_data1}#####' | |
elif len(secret_data1)+5 >= (n_bytes): | |
break | |
secret_data = secret_data1 | |
if len(secret_data) > n_bytes: | |
return image_name, gr.Markdown.update("""<center><h3>Input image is too large""") | |
secret_data += "=====" | |
data_index = 0 | |
binary_secret_data = to_bin(secret_data) | |
data_len = len(binary_secret_data) | |
for row in image: | |
for pixel in row: | |
r, g, b = to_bin(pixel) | |
if data_index < data_len: | |
pixel[0] = int(r[:-1] + binary_secret_data[data_index], 2) | |
data_index += 1 | |
if data_index < data_len: | |
pixel[1] = int(g[:-1] + binary_secret_data[data_index], 2) | |
data_index += 1 | |
if data_index < data_len: | |
pixel[2] = int(b[:-1] + binary_secret_data[data_index], 2) | |
data_index += 1 | |
if data_index >= data_len: | |
break | |
return image | |
def conv_im(im,data): | |
uniqnum = uniq.uuid4() | |
byte_size = len(data) | |
print (f'bytes:{byte_size}') | |
data_pixels = byte_size*4 | |
print (f'pixels:{data_pixels}') | |
#data_sq=data_pixels/2 | |
data_sq = int(math.sqrt(data_pixels)) | |
data_pad = data_sq+100 | |
print (f'square image:{data_pad}x{data_pad}') | |
qr_im = im | |
img1 = Image.open(qr_im) | |
imgw = img1.size[0] | |
imgh = img1.size[1] | |
print (f'qr Size:{img1.size}') | |
#img1.thumbnail((imgw*4,imgh*4), Image.Resampling.LANCZOS) | |
if data_pad > imgw or data_pad > imgh : | |
img1 = img1.resize((int(data_pad),int(data_pad)), Image.Resampling.LANCZOS) | |
print (img1.size) | |
img1.save(f'tmpim{uniqnum}.png') | |
with open(f'tmpim{uniqnum}.png', "rb") as image_file: | |
encoded_string = base64.b64encode(image_file.read()) | |
image_file.close() | |
im_out = encode(f'tmpim{uniqnum}.png',data) | |
return im_out | |
################################### | |
def address(im): | |
secret_code="SECRET PASSWORD" | |
priv_key = decode(im) | |
print(f'priv_key:: {priv_key}') | |
key = RSA.import_key(priv_key,passphrase=secret_code) | |
public_key = key.publickey().export_key('PEM') | |
file_out_pub = open("receiver.pem", "wb") | |
file_out_pub.write(public_key) | |
file_out_pub.close() | |
hash_1 = calculate_hash(public_key, hash_function="sha256") | |
hash_2 = calculate_hash(hash_1, hash_function="ripemd160") | |
address = base58.b58encode(hash_2) | |
address= str(address) | |
print (address) | |
address = address.strip("b") | |
print (address) | |
address = address.strip("'") if address.startswith("'") else address.strip('"') | |
print (address) | |
return (address,key) | |
################ crypt ##################### | |
def calculate_hash(data, hash_function: str = "sha256") -> str: | |
if type(data) == str: | |
data = bytearray(data, "utf-8") | |
if hash_function == "sha256": | |
h = SHA256.new() | |
h.update(data) | |
return h.hexdigest() | |
if hash_function == "ripemd160": | |
h = RIPEMD160.new() | |
h.update(data) | |
return h.hexdigest() | |
def generate_keys(): | |
secret_code="SECRET PASSWORD" | |
key = RSA.generate(2048) | |
#private_key = key.export_key('PEM') | |
private_key = key.export_key(passphrase=secret_code, pkcs=8, | |
protection="scryptAndAES128-CBC") | |
print(f'private_key:: {private_key}') | |
file_out_priv = open("private.pem", "wb") | |
file_out_priv.write(private_key) | |
file_out_priv.close() | |
public_key = key.publickey().export_key('PEM') | |
file_out_pub = open("receiver.pem", "wb") | |
file_out_pub.write(public_key) | |
file_out_pub.close() | |
hash_1 = calculate_hash(public_key, hash_function="sha256") | |
hash_2 = calculate_hash(hash_1, hash_function="ripemd160") | |
address = base58.b58encode(hash_2) | |
address_im=make_qr(txt=address) | |
add_label = str(address) | |
add_label = add_label.strip("b").strip("'") | |
address_im = textover(address_im, "Wallet",add_label) | |
address_im.save("address_im.png") | |
#qr_link="test" | |
address_im = "address_im.png" | |
private_key_im = textover("private_key.png", "Key",add_label) | |
private_key_im.save("private_key_im.png") | |
priv_key = conv_im("private_key_im.png",data=private_key) | |
pub_key = conv_im("address_im.png",data=public_key) | |
return public_key,private_key,address_im,address,priv_key,pub_key | |
########********************************########################### | |
def encrypt_trans(data,pub_key): | |
#pub_key = decode(pub_im) | |
#data = eval(data) | |
#print (data) | |
data=str(data) | |
pub_key = pub_key.strip('"') | |
data = data.encode("utf-8") | |
recipient_key = RSA.import_key(eval(pub_key)) | |
session_key = get_random_bytes(16) | |
# Encrypt the session key with the public RSA key | |
cipher_rsa = PKCS1_OAEP.new(recipient_key) | |
enc_session_key = cipher_rsa.encrypt(session_key) | |
# Encrypt the data with the AES session key | |
cipher_aes = AES.new(session_key, AES.MODE_EAX) | |
ciphertext, tag = cipher_aes.encrypt_and_digest(data) | |
file_out = open("encrypted_data.bin", "wb") | |
[ file_out.write(x) for x in (enc_session_key, cipher_aes.nonce, tag, ciphertext) ] | |
file_out.close() | |
doc_name = "encrypted_data.bin" | |
with open(doc_name, "rb") as file: | |
file_data =(file.read()) | |
file.close() | |
return file_data | |
def decrypt_trans(data,im): | |
print(f' DATA ::: {data}') | |
enc_in = eval(data) | |
#print(f' KEY ::: {key}') | |
#private_key =key | |
#enc_in = decode(im) | |
secret_code="SECRET PASSWORD" | |
priv_key = decode(im) | |
print(f'priv_key:: {priv_key}') | |
private_key = RSA.import_key(priv_key,passphrase=secret_code) | |
##private_key = RSA.import_key(open("private.pem").read()) | |
#priv_key = decode(in2) | |
#print(f'priv_key:: {priv_key}') | |
#private_key = RSA.import_key(priv_key,passphrase=secret_code) | |
#print(f'private_key:: {private_key}') | |
enc_session_key = enc_in[:private_key.size_in_bytes()] | |
end1 = private_key.size_in_bytes()+16 | |
nonce = enc_in[private_key.size_in_bytes():end1] | |
start1=end1+1 | |
end2 = private_key.size_in_bytes()+32 | |
start2=end2+1 | |
tag = enc_in[end1:end2] | |
ciphertext = enc_in[end2:] | |
print (f'enc_session_key::{enc_session_key}') | |
print (f'nonce::{nonce}') | |
print (f'tag::{tag}') | |
print (f'ciphertext::{ciphertext}') | |
# Decrypt the session key with the private RSA key | |
cipher_rsa = PKCS1_OAEP.new(private_key) | |
session_key = cipher_rsa.decrypt(enc_session_key) | |
# Decrypt the data with the AES session key | |
cipher_aes = AES.new(session_key, AES.MODE_EAX, nonce) | |
data = cipher_aes.decrypt_and_verify(ciphertext, tag) | |
return(data.decode("utf-8")) | |
########********************************########################### | |
def encrypt_text(data,pub_im,priv_im,address): | |
pub_key = decode(pub_im) | |
data = data.encode("utf-8") | |
recipient_key = RSA.import_key(pub_key) | |
session_key = get_random_bytes(16) | |
# Encrypt the session key with the public RSA key | |
cipher_rsa = PKCS1_OAEP.new(recipient_key) | |
enc_session_key = cipher_rsa.encrypt(session_key) | |
# Encrypt the data with the AES session key | |
cipher_aes = AES.new(session_key, AES.MODE_EAX) | |
ciphertext, tag = cipher_aes.encrypt_and_digest(data) | |
file_out = open("encrypted_data.bin", "wb") | |
[ file_out.write(x) for x in (enc_session_key, cipher_aes.nonce, tag, ciphertext) ] | |
file_out.close() | |
doc_name = "encrypted_data.bin" | |
with open(doc_name, "rb") as file: | |
file_data =(file.read()) | |
print (f'file_data::{file_data}') | |
qr_link="test" | |
#trans_im1.save("trans_im.png") | |
hash_1 = calculate_hash(pub_key, hash_function="sha256") | |
hash_2 = calculate_hash(hash_1, hash_function="ripemd160") | |
address = base58.b58encode(hash_2) | |
add_label = str(address) | |
add_label = add_label.strip("b").strip("'") | |
trans_im1=make_qr(txt=address, color_b="#ECFD08") | |
private_key_im = textover(trans_im1, "Transaction",add_label) | |
private_key_im.save("private_key_im.png") | |
enc_qr = conv_im("private_key_im.png",data=file_data) | |
file.close() | |
return str(file_data),enc_qr | |
def decrypt_text(im,in2): | |
enc_in = decode(im) | |
secret_code="SECRET PASSWORD" | |
#private_key = RSA.import_key(open("private.pem").read()) | |
priv_key = decode(in2) | |
print(f'priv_key:: {priv_key}') | |
private_key = RSA.import_key(priv_key,passphrase=secret_code) | |
print(f'private_key:: {private_key}') | |
enc_session_key = enc_in[:private_key.size_in_bytes()] | |
end1 = private_key.size_in_bytes()+16 | |
nonce = enc_in[private_key.size_in_bytes():end1] | |
start1=end1+1 | |
end2 = private_key.size_in_bytes()+32 | |
start2=end2+1 | |
tag = enc_in[end1:end2] | |
ciphertext = enc_in[end2:] | |
print (f'enc_session_key::{enc_session_key}') | |
print (f'nonce::{nonce}') | |
print (f'tag::{tag}') | |
print (f'ciphertext::{ciphertext}') | |
# Decrypt the session key with the private RSA key | |
cipher_rsa = PKCS1_OAEP.new(private_key) | |
session_key = cipher_rsa.decrypt(enc_session_key) | |
# Decrypt the data with the AES session key | |
cipher_aes = AES.new(session_key, AES.MODE_EAX, nonce) | |
data = cipher_aes.decrypt_and_verify(ciphertext, tag) | |
return(data.decode("utf-8")) | |
def test_fn(im1,im2): | |
return im1,im2 |