class WOW(BaseCodec):
"""
WOW is a steganography method for Image.
Source code derived from: [JisongXie/WOW](https://github.com/JisongXie/WOW)
Only encode function is available.
"""
def __init__(self) -> None:
super().__init__()
def wow(self, cover: np.ndarray, payload: float) -> np.ndarray:
# Get 2D wavelet filters - Daubechies 8
# 1D high pass decomposition filter
hpdf = np.array(
[
[
-0.0544158422,
0.3128715909,
-0.6756307363,
0.5853546837,
0.0158291053,
-0.2840155430,
-0.0004724846,
0.1287474266,
0.0173693010,
-0.0440882539,
-0.0139810279,
0.0087460940,
0.0048703530,
-0.0003917404,
-0.0006754494,
-0.0001174768,
]
]
)
# 1D low pass decomposition filter
lpdf = np.array((-1) ** np.array(range(hpdf.shape[1])) * np.fliplr(hpdf))
# construction of 2D wavelet filters
f = [lpdf.T * hpdf, hpdf.T * lpdf, hpdf.T * hpdf]
# Get embedding costs
# inicialization
cover = cover.astype("float64")
p = -1
wetcost = 10**10
sizecover = cover.shape
# add padding
padsize = np.max(np.concatenate([f[0].shape, f[1].shape, f[2].shape]))
cover_padded = np.pad(
cover, ((padsize, padsize), (padsize, padsize)), "symmetric" # type: ignore
)
# compute directional residual and suitability xi for each filter
xi = []
for f_index in range(0, 3):
# compute residual
r = convolve2d(cover_padded, f[f_index], "same")
# compute suitability
xi.append(convolve2d(abs(r), np.rot90(abs(f[f_index]), 2), "same"))
# correct the suitability shift if filter size is even
if np.mod(np.size(f[f_index], 0), 2) == 0:
xi[f_index] = np.roll(xi[f_index], 1, axis=0)
# xi[fIndex] = circshift(xi[fIndex], [1, 0])
if np.mod(np.size(f[f_index], 1), 2) == 0:
xi[f_index] = np.roll(xi[f_index], 1, axis=1)
# xi[fIndex] = circshift(xi[fIndex], [0, 1])
# remove padding
a_idx_s = int((np.size(xi[f_index], 0) - sizecover[0]) / 2)
a_idx_e = int(
np.size(xi[f_index], 0) - (np.size(xi[f_index], 0) - sizecover[0]) / 2
)
b_idx_s = int((np.size(xi[f_index], 1) - sizecover[1]) / 2)
b_idx_e = int(
np.size(xi[f_index], 1) - (np.size(xi[f_index], 1) - sizecover[1]) / 2
)
xi[f_index] = xi[f_index][a_idx_s:a_idx_e, b_idx_s:b_idx_e]
# compute embedding costs \rho
rho = (xi[0] ** p + xi[1] ** p + xi[2] ** p) ** (-1 / p)
# adjust embedding costs
rho[rho > wetcost] = wetcost # threshold on the costs
rho[np.isnan(rho)] = wetcost # if all xi{} are zero threshold the cost
rhop1 = rho.copy()
rhom1 = rho.copy()
rhop1[cover == 255] = wetcost # do not embed +1 if the pixel has max value
rhom1[cover == 0] = wetcost # do not embed -1 if the pixel has min value
# Embedding simulator
stego = self.embedding_simulator(
cover, rhop1, rhom1, payload * cover.size, False
)
return stego
def embedding_simulator(
self,
x: np.ndarray,
rhop1: np.ndarray,
rhom1: np.ndarray,
m: float,
fix_embedding_changes: bool,
) -> np.ndarray:
n = x.size
m_lambda = self.calc_lambda(rhop1, rhom1, m, n)
pchange_p1 = (np.exp(-m_lambda * rhop1)) / (
1 + np.exp(-m_lambda * rhop1) + np.exp(-m_lambda * rhom1)
)
pchange_m1 = (np.exp(-m_lambda * rhom1)) / (
1 + np.exp(-m_lambda * rhop1) + np.exp(-m_lambda * rhom1)
)
if fix_embedding_changes:
np.random.seed(100)
else:
np.random.seed(int(time.time()))
rand_change = np.random.rand(*x.shape)
y = x.copy()
y[rand_change < pchange_p1] = y[rand_change < pchange_p1] + 1
y[
np.logical_and(
rand_change >= pchange_p1, rand_change < (pchange_p1 + pchange_m1)
)
] = (
y[
np.logical_and(
rand_change >= pchange_p1, rand_change < (pchange_p1 + pchange_m1)
)
]
- 1
)
return y
def calc_lambda(
self, rhop1: np.ndarray, rhom1: np.ndarray, message_length: float, n: int
) -> float:
l3 = 1e3
m3 = np.float64(message_length + 1)
iterations = 0
while m3 > message_length:
l3 = l3 * 2
pp1 = np.exp(-l3 * rhop1) / (1 + np.exp(-l3 * rhop1) + np.exp(-l3 * rhom1))
pm1 = np.exp(-l3 * rhom1) / (1 + np.exp(-l3 * rhop1) + np.exp(-l3 * rhom1))
m3 = self.ternary_entropyf(pp1, pm1)
iterations = iterations + 1
if iterations > 10:
m_lambda = l3
return m_lambda
l1 = 0.0
m1 = np.float64(n)
m_lambda = 0
alpha = float(message_length) / n
# limit search to 30 iterations
# payload embedded is roughly within 1/1000 of the required relative payload
while float(m1 - m3) / n > (alpha / 1000.0) and (iterations < 30):
m_lambda = l1 + (l3 - l1) / 2
pp1 = (np.exp(-l3 * rhop1)) / (
1 + np.exp(-l3 * rhop1) + np.exp(-l3 * rhom1)
)
pm1 = (np.exp(-l3 * rhom1)) / (
1 + np.exp(-l3 * rhop1) + np.exp(-l3 * rhom1)
)
m2 = self.ternary_entropyf(pp1, pm1)
if m2 < message_length:
l3 = m_lambda
m3 = m2
else:
l1 = m_lambda
m1 = m2
iterations = iterations + 1
return m_lambda
def ternary_entropyf(self, pp1: np.ndarray, pm1: np.ndarray) -> np.float64:
eps = 3e-16
p0 = 1 - pp1 - pm1
p = np.concatenate(
[p0.flatten(order="F"), pp1.flatten(order="F"), pm1.flatten(order="F")]
)
p[p == 0] = 1e-16 # clear warning: divide by zero encountered in log2
h = -(p * np.log2(p))
h[np.logical_or(p < eps, p > (1 - eps))] = 0
ht = sum(h)
return np.float64(ht)
def encode(self, carrier: Image.Image, payload: float) -> Image.Image:
"""Merge secret_img into host image
Args:
carrier: host image
payload: payload in bits per pixel
Returns:
Image: stego image.
"""
if carrier.mode == "RGB":
carrier = carrier.convert("L")
cover = np.array(carrier)
stego = Image.fromarray(self.wow(cover, payload))
if stego.mode == "F":
stego = stego.convert("RGB")
return stego
def decode(self, carrier: Image.Image) -> None:
# TODO: decode
raise NotImplementedError("Decode not implemented.")