Skip to content

PVDStego

stegobox.codec.PVDStego

Bases: BaseCodec

PVD Steganography: hide txt in png

This module implements LSB substitution and PVD steganography in PNG images.

  • To encode: This method is to convert the pixel difference of the input image to the number of bits that can be used from the Least Significant bits to replace with the secret data.
  • To decode: The least significant bits of the encoded image are read off with the calculation of the pixel difference of the cover image. After which the secret message would be recovered.

Originally implemented in tony-josi/pvd_steganography

Source code in stegobox/codec/pvd_stego.py
class PVDStego(BaseCodec):
    """PVD Steganography: hide txt in png

    This module implements LSB substitution and PVD steganography in PNG images.

    * To encode: This method is to convert the pixel difference of the input image to
      the number of bits that can be used from the Least Significant bits to replace
      with the secret data.
    * To decode: The least significant bits of the encoded image are read off with the
      calculation of the pixel difference of the cover image. After which the secret
      message would be recovered.

    Originally implemented in
    [tony-josi/pvd_steganography](https://github.com/tony-josi/pvd_steganography)
    """

    def _pvd_table(self, p_diff: int) -> int:
        nbits = 0
        if p_diff < 16:
            nbits = 2
        elif 16 < p_diff < 32:
            nbits = 3
        else:
            nbits = 4
        return nbits

    def _embed_capacity(self, img: Image.Image) -> int:
        # Calculated the embedding capacity of a given image.
        embed_capacity = 0
        img_height, img_width = img.size

        no_of_matrix_h = img_height // 3 - 1
        no_of_matrix_w = img_width // 3 - 1

        if no_of_matrix_h < 1 or no_of_matrix_w < 1 or len(img.getpixel((0, 0))) < 3:
            return embed_capacity

        # Split the image to [3 x 3] blocks
        for height_itr in range(0, no_of_matrix_h * 3, 3):
            for width_itr in range(0, no_of_matrix_w * 3, 3):

                # Get the middle pixel as reference pixel
                ref_rgb = img.getpixel((height_itr + 1, width_itr + 1))

                # Iterate through the remaining pixels in the order
                # [1 2 3]
                # [4 - 5]
                # [6 7 8]
                for h_j in range(height_itr, height_itr + 3):
                    for w_i in range(width_itr, width_itr + 3):

                        if w_i == width_itr + 1 or h_j == height_itr + 1:
                            continue

                        c_rgb = img.getpixel((h_j, w_i))

                        embed_capacity += (
                            self._pvd_table(abs(c_rgb[0] - ref_rgb[0]))
                            + self._pvd_table(abs(c_rgb[1] - ref_rgb[1]))
                            + self._pvd_table(abs(c_rgb[2] - ref_rgb[2]))
                        )

        return embed_capacity // 8

    def _replace_lsbs(self, pixel: int, bits: int, value: int) -> int:
        # Replace the given LS bits with given data
        mask = (1 << bits) - 1
        pixel &= ~mask
        return pixel | value

    def _get_lsbs(self, pixel: int, bits: int) -> int:
        # Get the given number of LS bits data
        mask = (1 << bits) - 1
        pixel &= mask
        return pixel

    def _embed_data(self, img: Image.Image, payload_bytes: bytes) -> Image.Image:
        """Embed the secret file into the carrier png image.

        Args:
            img: The cover image.
            payload_bytes: The secret txt file to embed.

        Returns:
            The encoded PNG image object with the secret file embedded.
        """

        embedded_ds = 0

        bits_reader = FileBitsReader(payload_bytes)

        img_height, img_width = img.size
        no_of_matrix_h = img_height // 3 - 1
        no_of_matrix_w = img_width // 3 - 1

        if no_of_matrix_h < 1 or no_of_matrix_w < 1 or len(img.getpixel((0, 0))) < 3:
            raise Exception("Invalid image, too small or not in RGB format")

        for height_itr in range(0, no_of_matrix_h * 3, 3):
            for width_itr in range(0, no_of_matrix_w * 3, 3):

                ref_rgb = img.getpixel((height_itr + 1, width_itr + 1))

                for h_j in range(height_itr, height_itr + 3):
                    for w_i in range(width_itr, width_itr + 3):

                        if w_i == width_itr + 1 or h_j == height_itr + 1:
                            continue

                        c_rgb = img.getpixel((h_j, w_i))
                        c_rgb_list = list(c_rgb)

                        done_embedding = False
                        for rgb in range(3):
                            bits_reqd = self._pvd_table(abs(c_rgb[rgb] - ref_rgb[rgb]))
                            embedded_ds += bits_reqd

                            # Get the required number of bits corresponding to
                            # the pixel difference from the current pixel and
                            # reference pixel from the input file.
                            ret_val = bits_reader.get_bits(bits_reqd)

                            # Replace the LSBs of the pixel with the file data.
                            c_rgb_list[rgb] = self._replace_lsbs(
                                c_rgb[rgb], ret_val[2], ret_val[1]
                            )
                            if ret_val[0] is True:
                                done_embedding = True
                                break

                        # Replace the pixel value with modified contents
                        img.putpixel((h_j, w_i), tuple(c_rgb_list))  # type: ignore

                        if done_embedding:
                            return img

        raise Exception("Failed to embed the data")

    def _extract_data(self, ref_img: Image.Image, pvd_img: Image.Image, output: str):
        """Extract the secret file from the encoded image.

        Args:
            ref_image: The encoded steganographic image.
            pvd_img: The original cover image.
            output: The output txt file.
        """
        bits_writer = FileBitsWriter(output)

        embedded_ds = 0

        ref_img_height, ref_img_width = pvd_img.size
        pvd_img_height, pvd_img_width = pvd_img.size

        if ref_img_height != pvd_img_height or ref_img_width != pvd_img_width:
            raise ValueError("Ref vs embedded image not matching")

        no_of_matrix_h = ref_img_height // 3 - 1
        no_of_matrix_w = ref_img_width // 3 - 1

        if (
            no_of_matrix_h < 1
            or no_of_matrix_w < 1
            or len(ref_img.getpixel((0, 0))) < 3
        ):
            return embedded_ds

        magic_extracted = False
        eof_reached = False
        encoded_size = 0

        for height_itr in range(0, no_of_matrix_h * 3, 3):
            for width_itr in range(0, no_of_matrix_w * 3, 3):

                ref_rgb = ref_img.getpixel((height_itr + 1, width_itr + 1))

                for h_j in range(height_itr, height_itr + 3):
                    for w_i in range(width_itr, width_itr + 3):

                        if w_i == width_itr + 1 or h_j == height_itr + 1:
                            continue

                        c_rgb = ref_img.getpixel((h_j, w_i))
                        pvd_c_rgb = pvd_img.getpixel((h_j, w_i))
                        # c_rgb_list = list(c_rgb)

                        for rgb in range(3):
                            bits_reqd = self._pvd_table(abs(c_rgb[rgb] - ref_rgb[rgb]))
                            embedded_ds += bits_reqd
                            data = self._get_lsbs(pvd_c_rgb[rgb], bits_reqd)
                            bits_writer.set_bits(eof_reached, bits_reqd, data)
                            if (
                                magic_extracted
                                and (encoded_size + PVD_HEADER_SIZE)
                                == bits_writer.bytes_wrote_to_file_so_far
                            ):
                                eof_reached = True

                            # If we have completed reading the header?
                            if (
                                bits_writer.bytes_wrote_to_file_so_far
                                >= (PVD_HEADER_SIZE)
                            ) and magic_extracted is False:
                                magic_extracted = True
                                magic = bits_writer.data[:PVD_HEADER_SIZE]
                                pvd_magic = magic[:4]
                                pvd_versn = magic[4:7]
                                # Check if the magic and version are matching
                                if pvd_magic != PVD_MAGIC or pvd_versn != PVD_VERSION:
                                    raise ValueError(
                                        "Invalid version or image... magic: "
                                        f"{pvd_magic} versn: {pvd_versn}"
                                    )
                                size_arr = magic[-4:]
                                # Parse the encoded data size in the image
                                encoded_size = (
                                    (size_arr[0] << 24)
                                    + (size_arr[1] << 16)
                                    + (size_arr[2] << 8)
                                    + (size_arr[3] << 0)
                                )

                            if eof_reached:
                                # If we have read all the data required then
                                # close the file
                                bits_writer.close_file()
                                return embedded_ds

        return -1

    def encode(self, carrier: Image.Image, payload: bytes) -> Image.Image:
        """Encode the secret txt file into the cover image using PVD.

        Args:
            carrier: The cover image.
            payload: The secret txt file to embed. Read with
                `stegobox.io.txt.read_bytes()`.
        """
        embed_cap = self._embed_capacity(carrier)
        s_f_size = len(payload)

        if embed_cap < s_f_size:
            raise ValueError(
                "ERROR: Secret file size is more than embedding capacity of image - "
                f"Capacity: {embed_cap} bytes, secret file size: {s_f_size} bytes."
            )

        return self._embed_data(carrier, payload)

    def decode(self, _: str) -> None:
        """
        PVDStego requires a reference image to decode, use `decode_with_original()` and
        pass a reference to the original image instead of this `decode()` function.
        """
        raise ValueError("Decoding requires the original carrier image as well.")

    def decode_with_original(
        self, carrier: Image.Image, original: Image.Image, output: str
    ) -> None:
        """Using both the encoded steganographic image and the original cover image for
        extracting the payload with PVD.

        The reason why we need the class FileBitsWriter rather than the function
        `io.txt.write_bytes()` is that the output file is generated as the encoded
        steganographic image is decoded.

        Args:
            carrier: The encoded steganographic image.
            original: The original cover image.
            output: The output txt file.
        """
        self._extract_data(original, carrier, output)

encode(carrier, payload)

Encode the secret txt file into the cover image using PVD.

Parameters:

Name Type Description Default
carrier Image

The cover image.

required
payload bytes

The secret txt file to embed. Read with stegobox.io.txt.read_bytes().

required
Source code in stegobox/codec/pvd_stego.py
def encode(self, carrier: Image.Image, payload: bytes) -> Image.Image:
    """Encode the secret txt file into the cover image using PVD.

    Args:
        carrier: The cover image.
        payload: The secret txt file to embed. Read with
            `stegobox.io.txt.read_bytes()`.
    """
    embed_cap = self._embed_capacity(carrier)
    s_f_size = len(payload)

    if embed_cap < s_f_size:
        raise ValueError(
            "ERROR: Secret file size is more than embedding capacity of image - "
            f"Capacity: {embed_cap} bytes, secret file size: {s_f_size} bytes."
        )

    return self._embed_data(carrier, payload)

decode(_)

PVDStego requires a reference image to decode, use decode_with_original() and pass a reference to the original image instead of this decode() function.

Source code in stegobox/codec/pvd_stego.py
def decode(self, _: str) -> None:
    """
    PVDStego requires a reference image to decode, use `decode_with_original()` and
    pass a reference to the original image instead of this `decode()` function.
    """
    raise ValueError("Decoding requires the original carrier image as well.")

decode_with_original(carrier, original, output)

Using both the encoded steganographic image and the original cover image for extracting the payload with PVD.

The reason why we need the class FileBitsWriter rather than the function io.txt.write_bytes() is that the output file is generated as the encoded steganographic image is decoded.

Parameters:

Name Type Description Default
carrier Image

The encoded steganographic image.

required
original Image

The original cover image.

required
output str

The output txt file.

required
Source code in stegobox/codec/pvd_stego.py
def decode_with_original(
    self, carrier: Image.Image, original: Image.Image, output: str
) -> None:
    """Using both the encoded steganographic image and the original cover image for
    extracting the payload with PVD.

    The reason why we need the class FileBitsWriter rather than the function
    `io.txt.write_bytes()` is that the output file is generated as the encoded
    steganographic image is decoded.

    Args:
        carrier: The encoded steganographic image.
        original: The original cover image.
        output: The output txt file.
    """
    self._extract_data(original, carrier, output)