class PVDStego(BaseCodec):
"""PVD Steganography: hide txt in png
This module implements LSB substitution and PVD steganography in PNG images.
* To encode: This method is to convert the pixel difference of the input image to
the number of bits that can be used from the Least Significant bits to replace
with the secret data.
* To decode: The least significant bits of the encoded image are read off with the
calculation of the pixel difference of the cover image. After which the secret
message would be recovered.
Originally implemented in
[tony-josi/pvd_steganography](https://github.com/tony-josi/pvd_steganography)
"""
def _pvd_table(self, p_diff: int) -> int:
nbits = 0
if p_diff < 16:
nbits = 2
elif 16 < p_diff < 32:
nbits = 3
else:
nbits = 4
return nbits
def _embed_capacity(self, img: Image.Image) -> int:
# Calculated the embedding capacity of a given image.
embed_capacity = 0
img_height, img_width = img.size
no_of_matrix_h = img_height // 3 - 1
no_of_matrix_w = img_width // 3 - 1
if no_of_matrix_h < 1 or no_of_matrix_w < 1 or len(img.getpixel((0, 0))) < 3:
return embed_capacity
# Split the image to [3 x 3] blocks
for height_itr in range(0, no_of_matrix_h * 3, 3):
for width_itr in range(0, no_of_matrix_w * 3, 3):
# Get the middle pixel as reference pixel
ref_rgb = img.getpixel((height_itr + 1, width_itr + 1))
# Iterate through the remaining pixels in the order
# [1 2 3]
# [4 - 5]
# [6 7 8]
for h_j in range(height_itr, height_itr + 3):
for w_i in range(width_itr, width_itr + 3):
if w_i == width_itr + 1 or h_j == height_itr + 1:
continue
c_rgb = img.getpixel((h_j, w_i))
embed_capacity += (
self._pvd_table(abs(c_rgb[0] - ref_rgb[0]))
+ self._pvd_table(abs(c_rgb[1] - ref_rgb[1]))
+ self._pvd_table(abs(c_rgb[2] - ref_rgb[2]))
)
return embed_capacity // 8
def _replace_lsbs(self, pixel: int, bits: int, value: int) -> int:
# Replace the given LS bits with given data
mask = (1 << bits) - 1
pixel &= ~mask
return pixel | value
def _get_lsbs(self, pixel: int, bits: int) -> int:
# Get the given number of LS bits data
mask = (1 << bits) - 1
pixel &= mask
return pixel
def _embed_data(self, img: Image.Image, payload_bytes: bytes) -> Image.Image:
"""Embed the secret file into the carrier png image.
Args:
img: The cover image.
payload_bytes: The secret txt file to embed.
Returns:
The encoded PNG image object with the secret file embedded.
"""
embedded_ds = 0
bits_reader = FileBitsReader(payload_bytes)
img_height, img_width = img.size
no_of_matrix_h = img_height // 3 - 1
no_of_matrix_w = img_width // 3 - 1
if no_of_matrix_h < 1 or no_of_matrix_w < 1 or len(img.getpixel((0, 0))) < 3:
raise Exception("Invalid image, too small or not in RGB format")
for height_itr in range(0, no_of_matrix_h * 3, 3):
for width_itr in range(0, no_of_matrix_w * 3, 3):
ref_rgb = img.getpixel((height_itr + 1, width_itr + 1))
for h_j in range(height_itr, height_itr + 3):
for w_i in range(width_itr, width_itr + 3):
if w_i == width_itr + 1 or h_j == height_itr + 1:
continue
c_rgb = img.getpixel((h_j, w_i))
c_rgb_list = list(c_rgb)
done_embedding = False
for rgb in range(3):
bits_reqd = self._pvd_table(abs(c_rgb[rgb] - ref_rgb[rgb]))
embedded_ds += bits_reqd
# Get the required number of bits corresponding to
# the pixel difference from the current pixel and
# reference pixel from the input file.
ret_val = bits_reader.get_bits(bits_reqd)
# Replace the LSBs of the pixel with the file data.
c_rgb_list[rgb] = self._replace_lsbs(
c_rgb[rgb], ret_val[2], ret_val[1]
)
if ret_val[0] is True:
done_embedding = True
break
# Replace the pixel value with modified contents
img.putpixel((h_j, w_i), tuple(c_rgb_list)) # type: ignore
if done_embedding:
return img
raise Exception("Failed to embed the data")
def _extract_data(self, ref_img: Image.Image, pvd_img: Image.Image, output: str):
"""Extract the secret file from the encoded image.
Args:
ref_image: The encoded steganographic image.
pvd_img: The original cover image.
output: The output txt file.
"""
bits_writer = FileBitsWriter(output)
embedded_ds = 0
ref_img_height, ref_img_width = pvd_img.size
pvd_img_height, pvd_img_width = pvd_img.size
if ref_img_height != pvd_img_height or ref_img_width != pvd_img_width:
raise ValueError("Ref vs embedded image not matching")
no_of_matrix_h = ref_img_height // 3 - 1
no_of_matrix_w = ref_img_width // 3 - 1
if (
no_of_matrix_h < 1
or no_of_matrix_w < 1
or len(ref_img.getpixel((0, 0))) < 3
):
return embedded_ds
magic_extracted = False
eof_reached = False
encoded_size = 0
for height_itr in range(0, no_of_matrix_h * 3, 3):
for width_itr in range(0, no_of_matrix_w * 3, 3):
ref_rgb = ref_img.getpixel((height_itr + 1, width_itr + 1))
for h_j in range(height_itr, height_itr + 3):
for w_i in range(width_itr, width_itr + 3):
if w_i == width_itr + 1 or h_j == height_itr + 1:
continue
c_rgb = ref_img.getpixel((h_j, w_i))
pvd_c_rgb = pvd_img.getpixel((h_j, w_i))
# c_rgb_list = list(c_rgb)
for rgb in range(3):
bits_reqd = self._pvd_table(abs(c_rgb[rgb] - ref_rgb[rgb]))
embedded_ds += bits_reqd
data = self._get_lsbs(pvd_c_rgb[rgb], bits_reqd)
bits_writer.set_bits(eof_reached, bits_reqd, data)
if (
magic_extracted
and (encoded_size + PVD_HEADER_SIZE)
== bits_writer.bytes_wrote_to_file_so_far
):
eof_reached = True
# If we have completed reading the header?
if (
bits_writer.bytes_wrote_to_file_so_far
>= (PVD_HEADER_SIZE)
) and magic_extracted is False:
magic_extracted = True
magic = bits_writer.data[:PVD_HEADER_SIZE]
pvd_magic = magic[:4]
pvd_versn = magic[4:7]
# Check if the magic and version are matching
if pvd_magic != PVD_MAGIC or pvd_versn != PVD_VERSION:
raise ValueError(
"Invalid version or image... magic: "
f"{pvd_magic} versn: {pvd_versn}"
)
size_arr = magic[-4:]
# Parse the encoded data size in the image
encoded_size = (
(size_arr[0] << 24)
+ (size_arr[1] << 16)
+ (size_arr[2] << 8)
+ (size_arr[3] << 0)
)
if eof_reached:
# If we have read all the data required then
# close the file
bits_writer.close_file()
return embedded_ds
return -1
def encode(self, carrier: Image.Image, payload: bytes) -> Image.Image:
"""Encode the secret txt file into the cover image using PVD.
Args:
carrier: The cover image.
payload: The secret txt file to embed. Read with
`stegobox.io.txt.read_bytes()`.
"""
embed_cap = self._embed_capacity(carrier)
s_f_size = len(payload)
if embed_cap < s_f_size:
raise ValueError(
"ERROR: Secret file size is more than embedding capacity of image - "
f"Capacity: {embed_cap} bytes, secret file size: {s_f_size} bytes."
)
return self._embed_data(carrier, payload)
def decode(self, _: str) -> None:
"""
PVDStego requires a reference image to decode, use `decode_with_original()` and
pass a reference to the original image instead of this `decode()` function.
"""
raise ValueError("Decoding requires the original carrier image as well.")
def decode_with_original(
self, carrier: Image.Image, original: Image.Image, output: str
) -> None:
"""Using both the encoded steganographic image and the original cover image for
extracting the payload with PVD.
The reason why we need the class FileBitsWriter rather than the function
`io.txt.write_bytes()` is that the output file is generated as the encoded
steganographic image is decoded.
Args:
carrier: The encoded steganographic image.
original: The original cover image.
output: The output txt file.
"""
self._extract_data(original, carrier, output)