class JStego(BaseCodec):
"""JStego steganography
This module implements JSteg steganography in JPEG images.
* To encode: The input image and message are both converted into binary, 8 * 8
pieces of image DCT transform and quantization. The DCT coefficient of the
original image was extracted to obtain the AC coefficient. The secret information
was converted into a binary sequence, and according to the replacement rules of
JSteg information hiding algorithm, the lowest bit of AC coefficient in the
original image was replaced by every bit of information in the binary sequence.
At the end of the replacement process, save as a loaded image.
* To decode: The DCT coefficient of the dense image was extracted to obtain the AC
coefficient. The lowest bit of other AC coefficients except -1, 0, and 1 is taken
out, and the original secret sequence is obtained by combining them according to
the embedding order.
"""
def __init__(self) -> None:
# Standard quantization table
self.Q = np.array(
[
[16, 11, 10, 16, 24, 40, 51, 61],
[12, 12, 14, 19, 26, 58, 60, 55],
[14, 13, 16, 24, 40, 57, 69, 56],
[14, 17, 22, 29, 51, 87, 80, 62],
[18, 22, 37, 56, 68, 109, 103, 77],
[24, 35, 55, 64, 81, 104, 113, 92],
[49, 64, 78, 87, 103, 121, 120, 101],
[72, 92, 95, 98, 112, 100, 103, 99],
]
)
super().__init__()
def encode(self, carrier: Image.Image, payload: str) -> Image.Image:
"""Merge secret_img into host_img.
Args:
carrier: Carrier image in format JEPG.
payload: Payload (secret message) to be encoded.
Returns:
Image: container image.
"""
self._check_empty_payload(payload)
img = np.array(carrier)
# A channel of images
img1 = img[:, :, 1]
h, w = img1.shape
# D = np.zeros(img1.shape, dtype=np.float32)
img1 = img1.astype(np.float32)
d = img1
block_y = h // 8
block_x = w // 8
d = self._dct(d, block_y, block_x)
byte_s = payload.encode("utf-8")
# Convert payload to binary (removes leading '0b') contains string length
hex_bin = byte_s.hex()
oct_bin = oct(int(hex_bin, 16))
binary_bin = bin(int(oct_bin, 8))[2:].zfill(len(hex_bin) * 4)
lenth_bin = bin(len(binary_bin))[2:].zfill(12)
binary_bin = lenth_bin + binary_bin
data_lenth = len(binary_bin)
num = 0
for i in range(block_y * 8):
for j in range(block_x * 8):
if (i % 8 == 0) and (j % 8 == 0):
continue
if d[i, j] == 0:
continue
if d[i, j] == 1:
continue
if d[i, j] == -1:
continue
if (d[i, j] % 2) == 1:
if binary_bin[num] == "0":
if d[i, j] > 0:
d[i, j] = d[i, j] - 1
elif d[i, j] < 0:
d[i, j] = d[i, j] + 1
elif (d[i, j] % 2) == 0:
if binary_bin[num] == "1":
if d[i, j] > 0:
d[i, j] = d[i, j] + 1
elif d[i, j] < 0:
d[i, j] = d[i, j] - 1
num = num + 1
if num >= data_lenth:
break
if num >= data_lenth:
break
d = self._idct(d, block_y, block_x)
for i in range(h):
for j in range(w):
if d[i][j] > 255:
d[i][j] = 255
if d[i][j] < 0:
d[i][j] = 0
d = np.round(d)
d = np.uint8(d)
img[:, :, 1] = d
new_img = Image.fromarray(np.uint8(img))
return new_img
def decode(self, carrier: Image.Image) -> str:
img = np.array(carrier)
img1 = img[:, :, 1]
data_lenth = 100
extract = ""
h, w = img1.shape
d = np.zeros(img1.shape)
block_y = h // 8
block_x = w // 8
img1 = img1.astype(np.float32)
d = self._dct(img1, block_y, block_x)
num = 0
for i in range(block_y * 8):
for j in range(block_x * 8):
if (i % 8 == 0) and (j % 8 == 0):
continue
if d[i, j] == 0:
continue
if d[i, j] == 1:
continue
if d[i, j] == -1:
continue
if (d[i, j] % 2) == 1:
extract += "1"
num = num + 1
elif (d[i, j] % 2) == 0:
extract += "0"
num = num + 1
if num == 13:
data_lenth = int(extract[:12], 2) + 12
if num >= data_lenth:
break
if num >= data_lenth:
break
return bytes.fromhex(hex(int(extract[12:], 2))[2:]).decode("utf-8")
def _check_empty_payload(self, payload: str) -> None:
if not payload:
raise Exception("Payload must not be empty.")
def _dct(self, before: np.ndarray, block_y: int, block_x: int) -> np.ndarray:
# dct the matrix by 8 * 8 matrix
for i in range(block_y):
for j in range(block_x):
before[8 * i : 8 * (i + 1), 8 * j : 8 * (j + 1)] = np.around(
cv2.dct(before[8 * i : 8 * (i + 1), 8 * j : 8 * (j + 1)])
)
before[8 * i : 8 * (i + 1), 8 * j : 8 * (j + 1)] = np.array(
np.around(
np.divide(
before[8 * i : 8 * (i + 1), 8 * j : 8 * (j + 1)], self.Q
)
)
)
return before
def _idct(self, before: np.ndarray, block_y: int, block_x: int) -> np.ndarray:
# idct the matrix by 8 * 8 matrix
for i in range(block_y):
for j in range(block_x):
before[8 * i : 8 * (i + 1), 8 * j : 8 * (j + 1)] = np.array(
np.multiply(
before[8 * i : 8 * (i + 1), 8 * j : 8 * (j + 1)], self.Q
)
)
before[8 * i : 8 * (i + 1), 8 * j : 8 * (j + 1)] = cv2.idct(
before[8 * i : 8 * (i + 1), 8 * j : 8 * (j + 1)]
)
return before