Skip to content

LSBMP4

stegobox.codec.LSBMP4

Bases: BaseCodec

LSBMP4: Hide text into MP4 using LSB.

  • Created by: Jiayao Yang
  • Created time: 2022/10/17

Source code derived from: MohamedSalahApdElzaher/video_steganography

Source code in stegobox/codec/lsb_mp4.py
class LSBMP4(BaseCodec):
    """
    LSBMP4: Hide text into MP4 using LSB.

    * Created by: Jiayao Yang
    * Created time: 2022/10/17

    Source code derived from:
    [MohamedSalahApdElzaher/video_steganography](https://github.com/MohamedSalahApdElzaher/video_steganography)
    """

    def __init__(self, tmp_frame: str = "./temp") -> None:
        super().__init__()
        self.tmp_frame = tmp_frame

    def lsb_hide(
        self,
        input_image: str,
        message: str,
        encoding: str = "UTF-8",
        shift: int = 0,
        auto_convert_rgb: bool = False,
    ):
        """Hide a message (string) in an image with LSB (Least Significant Bit)."""
        message_length = len(message)
        assert message_length != 0, "message length is zero"

        img = io.image.read(input_image)

        if img.mode not in ["RGB", "RGBA"]:
            if not auto_convert_rgb:
                print("The mode of the image is not RGB. Mode is {}".format(img.mode))
                answer = input("Convert the image to RGB ? [Y / n]\n") or "Y"
                if answer.lower() == "n":
                    raise Exception("Not a RGB image.")
            img = img.convert("RGB")

        encoded = img.copy()
        width, height = img.size
        index = 0

        message = str(message_length) + ":" + str(message)
        message_bits = "".join(
            [
                bin(ord(x))[2:].rjust({"UTF-8": 8, "UTF-32LE": 32}[encoding], "0")
                for x in message
            ]
        )
        message_bits += "0" * ((3 - (len(message_bits) % 3)) % 3)

        npixels = width * height
        len_message_bits = len(message_bits)
        if len_message_bits > npixels * 3:
            raise Exception(
                "The message you want to hide is too long: {}".format(message_length)
            )
        for row in range(height):
            for col in range(width):
                if shift != 0:
                    shift -= 1
                    continue
                if index + 3 <= len_message_bits:

                    # Get the colour component.
                    pixel = img.getpixel((col, row))
                    r = pixel[0]
                    g = pixel[1]
                    b = pixel[2]

                    # Change the Least Significant Bit of each colour component.
                    r = r & ~1 | int(message_bits[index])
                    g = g & ~1 | int(message_bits[index + 1])
                    b = b & ~1 | int(message_bits[index + 2])

                    # Save the new pixel
                    if img.mode == "RGBA":
                        encoded.putpixel((col, row), (r, g, b, pixel[3]))
                    else:
                        encoded.putpixel((col, row), (r, g, b))

                    index += 3
                else:
                    img.close()
                    return encoded

    def lsb_reveal(
        self,
        input_image: str,
        encoding: str = "UTF-8",
        shift: int = 0,
    ):
        """Find a message in an image (with the LSB technique)."""
        img = Image.open(input_image)
        width, height = img.size
        buff, count = 0, 0
        bitab = []
        limit = None
        for row in range(height):
            for col in range(width):
                if shift != 0:
                    shift -= 1
                    continue
                # pixel = [r, g, b] or [r,g,b,a]
                pixel = img.getpixel((col, row))
                if img.mode == "RGBA":
                    pixel = pixel[:3]  # ignore the alpha
                for color in pixel:
                    buff += (color & 1) << (
                        {"UTF-8": 8, "UTF-32LE": 32}[encoding] - 1 - count
                    )
                    count += 1
                    if count == {"UTF-8": 8, "UTF-32LE": 32}[encoding]:
                        bitab.append(chr(buff))
                        buff, count = 0, 0
                        if bitab[-1] == ":" and limit is None:
                            try:
                                limit = int("".join(bitab[:-1]))
                            except Exception:
                                pass

                if len(bitab) - len(str(limit)) - 1 == limit:
                    img.close()
                    return "".join(bitab)[len(str(limit)) + 1 :]

    # Extract frames (i.e digital imgs from video) in directory ./tmp
    def frame_extraction(self, video):
        if not os.path.exists(self.tmp_frame):
            os.makedirs(self.tmp_frame)
        # temp_folder = "./tempfile"
        print("tempfile directory is created")

        # capture the video frame by frame
        count = 0
        vidcap = video

        while True:
            # This method takes no arguments and returns a tuple.
            # The first returned value is a Boolean indicating
            # if a frame was read correctly (True) or not (False).
            success, image = vidcap.read()
            if not success:
                break
            # save image
            cv2.imwrite(os.path.join(self.tmp_frame, "{:d}.png".format(count)), image)
            count += 1

    # split input text to be hidden
    def split_string(self, s_str, count=10):
        per_c = math.ceil(len(s_str) / count)
        c_cout = 0
        out_str = ""
        split_list = []
        for s in s_str:
            out_str += s
            c_cout += 1
            if c_cout == per_c:
                split_list.append(out_str)
                out_str = ""
                c_cout = 0
        if c_cout != 0:
            split_list.append(out_str)
        return split_list

    # hide split text in video frames
    def encode_string(self, input_string):
        split_string_list = self.split_string(input_string)
        for i in range(0, len(split_string_list)):
            f_name = "{}{}.png".format(self.tmp_frame + "/", i)
            # LSB-Steganography is a steganography technique in which
            # we hide messages inside an image
            # by replacing Least significant bit of image with the bits of
            # message to be hidden.
            secret_enc = self.lsb_hide(f_name, split_string_list[i])
            if secret_enc is not None:
                secret_enc.save(f_name)
            else:
                raise ValueError("Sorry, carrier should not be None image.")
            # print("frame {} holds {}".format(f_name, split_string_list[i]))

    def encode(
        self, carrier: cv2.VideoCapture, payload: str
    ) -> tuple[list[numpy.ndarray], int, tuple[int, int]]:
        """Simple LSB steganography using JPEG images.

        Args:
            carrier: Input Carrier video in format mp4
                     Read with `stegobox.io.video.read()`.
            payload: Secret payload message.

        Returns:
            tuple[list[numpy.ndarray], int, tuple[int, int]]
            The encrypted steganography image.
        """
        fps = carrier.get(cv2.CAP_PROP_FPS)
        self.frame_extraction(carrier)
        self.encode_string(payload)

        img_array = []
        size = (0, 0)
        for filename in glob.glob(self.tmp_frame + "/*.png"):
            img = cv2.imread(filename)
            height, width, _ = img.shape
            size = (width, height)
            img_array.append(img)
        return img_array, fps, size

    def decode(self, carrier: cv2.VideoCapture) -> str:
        """Decode the secret payload from the carrier video

        Args:
            carrier: Carrier video in format mp4. Read with `stegobox.io.video.read()`.

        Returns:
            str: The decoded payload (secret message).
        """
        self.frame_extraction(carrier)
        secret = []
        # root = "./tempfile/"
        for i in range(len(os.listdir(self.tmp_frame))):
            f_name = "{}{}.png".format(self.tmp_frame + "/", i)
            secret_dec = self.lsb_reveal(f_name)
            if secret_dec is None:
                break
            secret.append(secret_dec)
        depayload = "".join(secret)
        if os.path.exists(self.tmp_frame):
            shutil.rmtree(self.tmp_frame)
        print("tempfile directory is deleted")
        return depayload

lsb_hide(input_image, message, encoding='UTF-8', shift=0, auto_convert_rgb=False)

Hide a message (string) in an image with LSB (Least Significant Bit).

Source code in stegobox/codec/lsb_mp4.py
def lsb_hide(
    self,
    input_image: str,
    message: str,
    encoding: str = "UTF-8",
    shift: int = 0,
    auto_convert_rgb: bool = False,
):
    """Hide a message (string) in an image with LSB (Least Significant Bit)."""
    message_length = len(message)
    assert message_length != 0, "message length is zero"

    img = io.image.read(input_image)

    if img.mode not in ["RGB", "RGBA"]:
        if not auto_convert_rgb:
            print("The mode of the image is not RGB. Mode is {}".format(img.mode))
            answer = input("Convert the image to RGB ? [Y / n]\n") or "Y"
            if answer.lower() == "n":
                raise Exception("Not a RGB image.")
        img = img.convert("RGB")

    encoded = img.copy()
    width, height = img.size
    index = 0

    message = str(message_length) + ":" + str(message)
    message_bits = "".join(
        [
            bin(ord(x))[2:].rjust({"UTF-8": 8, "UTF-32LE": 32}[encoding], "0")
            for x in message
        ]
    )
    message_bits += "0" * ((3 - (len(message_bits) % 3)) % 3)

    npixels = width * height
    len_message_bits = len(message_bits)
    if len_message_bits > npixels * 3:
        raise Exception(
            "The message you want to hide is too long: {}".format(message_length)
        )
    for row in range(height):
        for col in range(width):
            if shift != 0:
                shift -= 1
                continue
            if index + 3 <= len_message_bits:

                # Get the colour component.
                pixel = img.getpixel((col, row))
                r = pixel[0]
                g = pixel[1]
                b = pixel[2]

                # Change the Least Significant Bit of each colour component.
                r = r & ~1 | int(message_bits[index])
                g = g & ~1 | int(message_bits[index + 1])
                b = b & ~1 | int(message_bits[index + 2])

                # Save the new pixel
                if img.mode == "RGBA":
                    encoded.putpixel((col, row), (r, g, b, pixel[3]))
                else:
                    encoded.putpixel((col, row), (r, g, b))

                index += 3
            else:
                img.close()
                return encoded

lsb_reveal(input_image, encoding='UTF-8', shift=0)

Find a message in an image (with the LSB technique).

Source code in stegobox/codec/lsb_mp4.py
def lsb_reveal(
    self,
    input_image: str,
    encoding: str = "UTF-8",
    shift: int = 0,
):
    """Find a message in an image (with the LSB technique)."""
    img = Image.open(input_image)
    width, height = img.size
    buff, count = 0, 0
    bitab = []
    limit = None
    for row in range(height):
        for col in range(width):
            if shift != 0:
                shift -= 1
                continue
            # pixel = [r, g, b] or [r,g,b,a]
            pixel = img.getpixel((col, row))
            if img.mode == "RGBA":
                pixel = pixel[:3]  # ignore the alpha
            for color in pixel:
                buff += (color & 1) << (
                    {"UTF-8": 8, "UTF-32LE": 32}[encoding] - 1 - count
                )
                count += 1
                if count == {"UTF-8": 8, "UTF-32LE": 32}[encoding]:
                    bitab.append(chr(buff))
                    buff, count = 0, 0
                    if bitab[-1] == ":" and limit is None:
                        try:
                            limit = int("".join(bitab[:-1]))
                        except Exception:
                            pass

            if len(bitab) - len(str(limit)) - 1 == limit:
                img.close()
                return "".join(bitab)[len(str(limit)) + 1 :]

encode(carrier, payload)

Simple LSB steganography using JPEG images.

Parameters:

Name Type Description Default
carrier VideoCapture

Input Carrier video in format mp4 Read with stegobox.io.video.read().

required
payload str

Secret payload message.

required

Returns:

Type Description
list[ndarray]

tuple[list[numpy.ndarray], int, tuple[int, int]]

int

The encrypted steganography image.

Source code in stegobox/codec/lsb_mp4.py
def encode(
    self, carrier: cv2.VideoCapture, payload: str
) -> tuple[list[numpy.ndarray], int, tuple[int, int]]:
    """Simple LSB steganography using JPEG images.

    Args:
        carrier: Input Carrier video in format mp4
                 Read with `stegobox.io.video.read()`.
        payload: Secret payload message.

    Returns:
        tuple[list[numpy.ndarray], int, tuple[int, int]]
        The encrypted steganography image.
    """
    fps = carrier.get(cv2.CAP_PROP_FPS)
    self.frame_extraction(carrier)
    self.encode_string(payload)

    img_array = []
    size = (0, 0)
    for filename in glob.glob(self.tmp_frame + "/*.png"):
        img = cv2.imread(filename)
        height, width, _ = img.shape
        size = (width, height)
        img_array.append(img)
    return img_array, fps, size

decode(carrier)

Decode the secret payload from the carrier video

Parameters:

Name Type Description Default
carrier VideoCapture

Carrier video in format mp4. Read with stegobox.io.video.read().

required

Returns:

Name Type Description
str str

The decoded payload (secret message).

Source code in stegobox/codec/lsb_mp4.py
def decode(self, carrier: cv2.VideoCapture) -> str:
    """Decode the secret payload from the carrier video

    Args:
        carrier: Carrier video in format mp4. Read with `stegobox.io.video.read()`.

    Returns:
        str: The decoded payload (secret message).
    """
    self.frame_extraction(carrier)
    secret = []
    # root = "./tempfile/"
    for i in range(len(os.listdir(self.tmp_frame))):
        f_name = "{}{}.png".format(self.tmp_frame + "/", i)
        secret_dec = self.lsb_reveal(f_name)
        if secret_dec is None:
            break
        secret.append(secret_dec)
    depayload = "".join(secret)
    if os.path.exists(self.tmp_frame):
        shutil.rmtree(self.tmp_frame)
    print("tempfile directory is deleted")
    return depayload