class SUniward(BaseCodec):
"""
SUniward is a steganography method for Image.
Source code derived from:
[S-UNIWARD-python](https://github.com/TracyCuiq/S-UNIWARD-python)
Only encode function is available.
"""
def __init__(self) -> None:
super().__init__()
np.set_printoptions(threshold=sys.maxsize)
def encode(self, carrier: Image.Image, payload: float) -> Image.Image:
"""Merge secret_img into host image
Args:
carrier: host image
payload: payload in bits per pixel
Returns:
Image: stego image.
"""
cover = np.array(carrier)
stego = self.s_uniward(cover, payload)
return stego
def decode(self, carrier: Image.Image) -> None:
# TODO: decode
raise NotImplementedError("decode is not implemented")
def s_uniward(self, cover: np.ndarray, payload: float) -> Image.Image:
sgm = 1
# Get 2D wavelet filters - Daubechies 8
# 1D high pass decomposition filter
hpdf_list = [
-0.0544158422,
0.3128715909,
-0.6756307363,
0.5853546837,
0.0158291053,
-0.2840155430,
-0.0004724846,
0.1287474266,
0.0173693010,
-0.0440882539,
-0.0139810279,
0.0087460940,
0.0048703530,
-0.0003917404,
-0.0006754494,
-0.0001174768,
]
# 1D low pass decomposition filter
hpdf_len = range(0, len(hpdf_list))
hpdf_list_reverse = hpdf_list[::-1]
lpdf_list = hpdf_list
for i in range(len(hpdf_list)):
lpdf_list[i] = ((-1) ** hpdf_len[i]) * hpdf_list_reverse[i]
hpdf_array = np.array([hpdf_list])
lpdf_array = np.array([lpdf_list])
lpdf = lpdf_array.reshape(len(lpdf_list), 1)
hpdf = hpdf_array.reshape(len(hpdf_list), 1)
# construction of 2D wavelet filters
f1 = lpdf * hpdf_array
f2 = hpdf * lpdf_array
f3 = hpdf * hpdf_array
w_f = np.zeros((f1.shape[0], f1.shape[0], 3))
w_f[:, :, 0] = f1
w_f[:, :, 1] = f2
w_f[:, :, 2] = f3
# Get embedding costs
# initialization
wetcost = 100000000
cover_k, cover_l, _ = cover.shape
# add padding
s1, _1 = f1.shape
s2, _2 = f2.shape
s3, _3 = f3.shape
pad_size = max(s1, s2, s3)
cover_padded = np.zeros((cover_k + pad_size * 2, cover_l + pad_size * 2, 3))
for i in range(3):
cover_padded[:, :, i] = np.lib.pad(cover[:, :, i], pad_size, "symmetric")
xi = np.zeros((cover_k + pad_size * 2, cover_l + pad_size * 2, 3))
x = np.zeros((cover_k, cover_l, 3))
for i in range(3):
# compute residual
r = convolve2d(cover_padded[:, :, i], w_f[:, :, i], mode="same")
xi[:, :, i] = convolve2d(
1.0 / (np.abs(r) + sgm), np.rot90(abs(w_f[:, :, i]), 2), mode="same"
)
# correct the suitability shift if filter size is even
if s1 % 2 == 0:
xi[:, :, i] = np.roll(xi[:, :, i], [1, 0])
xi[:, :, i] = np.roll(xi[:, :, i], [0, 1])
# remove padding
s_xi, __xi = xi[:, :, i].shape
x[:, :, i] = xi[
int((s_xi - cover_k) / 2) : int(-(s_xi - cover_k) / 2),
int((__xi - cover_l) / 2) : int(-(__xi - cover_l) / 2),
i,
]
# compute embedding costs \rho
# rho = np.zeros((k, l))
rho = x[:, :, 0] + x[:, :, 1] + x[:, :, 2]
# adjust embedding costs
a, b = np.where(rho > wetcost)
for i in range(len(a)):
rho[a[i], b[i]] = wetcost # threshold on the costs
a, b = np.where(np.isnan(rho))
for i in range(len(a)):
rho[a[i], b[i]] = wetcost # if all xi{} are zero threshold the cost
# k, k_ = rho.shape
rhop1 = np.zeros((cover_k, cover_l, 3))
rhom1 = np.zeros((cover_k, cover_l, 3))
for i in range(3):
rhop1[:, :, i] = rho
rhom1[:, :, i] = rho
# a, b, c = np.where(cover - 255.0 <= 0.1)
a, b, c = np.where(cover == 255)
for i in range(len(a)):
rhop1[
a[i], b[i], c[i]
] = wetcost # do not embed +1 if the pixel has max value
# a, b, c = np.where(cover - 0 <= 0.1)
a, b, c = np.where(cover == 0)
for i in range(len(a)):
rhom1[
a[i], b[i], c[i]
] = wetcost # do not embed -1 if the pixel has min value
# Embedding simulator ##
cover_len = len(cover[:, :, 0]) * len(cover[:, :, 0])
stego = cover
for i in range(3):
stego[:, :, i] = self.embedding_simulator_singel(
cover[:, :, i],
rhop1[:, :, i],
rhom1[:, :, i],
payload * cover_len,
fix_embedding_changes=False,
)
return Image.fromarray(stego)
def embedding_simulator_singel(
self,
x: np.ndarray,
rhop1: np.ndarray,
rhom1: np.ndarray,
m: float,
fix_embedding_changes: bool = False,
) -> np.ndarray:
w, h = x.shape
cover_len = w * h
lam = self.cal_lambda_(rhop1, rhom1, m, cover_len)
shape = rhop1.shape
pchange_p1 = [
(math.exp(-lam * rhop1[i][j]))
/ (1 + math.exp(-lam * rhop1[i][j]) + math.exp(-lam * rhom1[i][j]))
for j in range(shape[1])
for i in range(shape[0])
]
pchange_m1 = [
(math.exp(-lam * rhom1[i][j]))
/ (1 + math.exp(-lam * rhop1[i][j]) + math.exp(-lam * rhom1[i][j]))
for j in range(shape[1])
for i in range(shape[0])
]
pchange_p1_array = np.array(pchange_p1).reshape(shape[1], shape[0]).T
pchange_m1_array = np.array(pchange_m1).reshape(shape[1], shape[0]).T
if fix_embedding_changes:
np.random.seed(139187)
rand_change = np.random.rand(w, h)
y = x
arr0, _0 = np.where(rand_change < pchange_p1_array)
for i in range(len(arr0)):
y[arr0[i]][_0[i]] += 1
arr1, _1 = np.where(
(rand_change >= pchange_p1_array)
& (rand_change < pchange_p1_array + pchange_m1_array)
)
for i in range(len(arr1)):
y[arr1[i]][_1[i]] -= 1
return y
def cal_lambda_(
self, rhop1: np.ndarray, rhom1: np.ndarray, message_length: float, n: int
) -> float:
l3 = 1e3
m3 = float(message_length + 1)
iterations = 0
lam = 0.0
while m3 > message_length:
pp1 = rhop1
# shape = lambda x: pp1.shape if pp1.shape == pm1.shape else 0
shape = pp1.shape
l3 = l3 * 2
pp1 = [
(math.exp(-l3 * rhop1[i][j]))
/ (1 + math.exp(-l3 * rhop1[i][j]) + math.exp(-l3 * rhom1[i][j]))
for j in range(shape[1])
for i in range(shape[0])
] # list
pm1 = [
(math.exp(-l3 * rhom1[i][j]))
/ (1 + math.exp(-l3 * rhop1[i][j]) + math.exp(-l3 * rhom1[i][j]))
for j in range(shape[1])
for i in range(shape[0])
] # list
m3 = self.ternary_entropyf_4list(pp1, pm1)
iterations = iterations + 1
if iterations > 10:
return l3
l1 = 0.0
m1 = float(n)
lam = 0.0
alpha = message_length / n
# limit search to 30 iterations
while (m1 - m3) / n > alpha / 1000.0 and iterations < 30:
lam = l1 + (l3 - l1) / 2
pp1 = [
(math.exp(-lam * rhop1[i][j]))
/ (1 + math.exp(-lam * rhop1[i][j]) + math.exp(-lam * rhom1[i][j]))
for j in range(shape[1])
for i in range(shape[0])
]
pm1 = [
(math.exp(-lam * rhom1[i][j]))
/ (1 + math.exp(-lam * rhop1[i][j]) + math.exp(-lam * rhom1[i][j]))
for j in range(shape[1])
for i in range(shape[0])
]
m2 = self.ternary_entropyf_4list(pp1, pm1)
if m2 < message_length:
l3 = lam
m3 = m2
else:
l1 = lam
m1 = m2
iterations = iterations + 1
return lam
def ternary_entropyf_4list(self, pp1_: list, pm1_: list) -> float:
p0 = [1 - pp1_[i] - pm1_[i] for i in range(len(pp1_))]
p = p0 + pp1_ + pm1_
ht = 0.0
for i in range(len(p)):
if p[i] != 0:
h = -(p[i] * math.log(p[i]))
ht += h
return ht