Skip to content

MP4HidePNG

stegobox.codec.MP4HidePNG

Bases: BaseCodec

MP4HidePNG: A minimal video steganography tool written in python. The project uses a variation of the LSB steganography algorithm to hide an image message in a carrier video file.

  • Created by: Jiayao Yang
  • Created time: 2022/10/22

Source code derived from: dalisyron/video-stegano.

Source code in stegobox/codec/mp4_hide_png.py
class MP4HidePNG(BaseCodec):
    """MP4HidePNG: A minimal video steganography tool written in python. The project
    uses a variation of the LSB steganography algorithm to hide an image message in a
    carrier video file.

    * Created by: Jiayao Yang
    * Created time: 2022/10/22

    Source code derived from:
    [dalisyron/video-stegano](https://github.com/dalisyron/video-stegano).
    """

    def __init__(self, tmp_frame: str = "./temp") -> None:
        super().__init__()
        self.tmp_frame = tmp_frame

    def encode(self, _):
        raise NotImplementedError("Use method encode_write_video instead")

    def encode_write_video(
        self, carrier: cv2.VideoCapture, payload: np.ndarray, output: str
    ) -> None:
        """Simple LSB steganography using MP4 video.

        Args:
            carrier: Input Carrier video in format mp4. Read with
                `stegobox.io.video.read()`.
            payload: Payload image PNG. Read with `stegobox.io.image.read_cv2()`.
            output: Path to write the encoded video in format mp4.
        """
        cap = carrier
        msg = payload

        vid_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        vid_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        vid_fps = cap.get(cv2.CAP_PROP_FPS)

        req_frame_count = self._required_frame_count(cap, msg)

        bin_rep_width = self._bin_rep(msg.shape[1]).zfill(16)
        bin_rep_height = self._bin_rep(msg.shape[0]).zfill(16)
        bin_rep_mult = self._bin_rep(msg.shape[0] * msg.shape[1]).zfill(32)

        image_bits = [
            int(x == "1") for x in bin_rep_width + bin_rep_height + bin_rep_mult
        ]

        for row in msg:
            for column in row:
                for color in column:
                    for i in range(8):
                        image_bits.append(int((color & (1 << (7 - i))) != 0))

        video_colors = []

        for i in range(req_frame_count):
            ret, frame = cap.read()
            if not ret:
                raise Exception("Error in extracting video frames")
            for row in frame:
                for column in row:
                    for color in range(3):
                        video_colors.append(column[color])

        assert len(video_colors) >= len(image_bits)
        for i in range(len(video_colors) - len(image_bits)):
            image_bits.append(0)

        video_colors = [(color & 0xFE) for color in video_colors]

        for i in range(len(video_colors)):
            video_colors[i] |= image_bits[i]

        video_colors = np.reshape(
            np.asarray(video_colors), (req_frame_count, vid_height, vid_width, 3)
        ).astype(np.uint8)

        # create dirs
        if os.path.exists(self.tmp_frame):
            self._remove_directory_content(self.tmp_frame)
        else:
            self._make_dir(self.tmp_frame)

        cnt = 1

        for frame in video_colors:
            cv2.imwrite(
                self.tmp_frame + "/frame{:09d}.png".format(cnt),
                frame,
                [cv2.IMWRITE_PNG_COMPRESSION, 0],
            )
            cnt += 1

        print("Starting write back...")
        while True:
            ret, frame = cap.read()

            if ret:
                cv2.imwrite(
                    self.tmp_frame + "/frame{:09d}.png".format(cnt),
                    frame,
                    [cv2.IMWRITE_PNG_COMPRESSION, 0],
                )
                cnt += 1
            else:
                break

        temp_dir = output.split("/", 2)
        print(temp_dir[1])
        self._make_dir(temp_dir[1])

        """
        subprocess.call(
            [
                "ffmpeg",
                "-y",
                "-framerate",
                str(vid_fps),
                "-i",
                "./temp/frame%09d.png",
                "-c:v",
                "copy",
                "./temp_out/stegano.mp4",
            ]
        )
        """

        video = ffmpeg.input(self.tmp_frame + "/frame%09d.png", framerate=str(vid_fps))
        out = ffmpeg.output(
            video.video, output, acodec="copy", vcodec="copy"
        ).overwrite_output()
        out.run()

        cap.release()
        print(f"Done. Stegano video created and saved to {output}.")

    def decode(self, carrier: str) -> np.ndarray:
        """Decode the secret image from the encoded video.

        Args:
            carrier: Path to the encoded video mp4 with the secret image embedded.

        Returns:
            The secret image in format png. Save with `stegobox.io.image.write_cv2()`.
        """
        if os.path.exists(self.tmp_frame):
            self._remove_directory_content(self.tmp_frame)
        else:
            self._make_dir(self.tmp_frame)

        # subprocess.call(["ffmpeg", "-y", "-i", carrier, "./temp/frame%09d.png"])

        video = ffmpeg.input(carrier)
        out = ffmpeg.output(
            video.video, self.tmp_frame + "/frame%09d.png"
        ).overwrite_output()
        out.run()
        frame_index = 1

        message_length = None
        message_width = None
        message_height = None
        frame_width = None
        frame_height = None

        message_bits = []

        while True:
            frame_path = self.tmp_frame + "/frame{:09d}.png".format(frame_index)
            if not os.path.exists(frame_path):
                print("ERROR:Not exist path")
                break

            frame = cv2.imread(frame_path)

            for row in frame:
                for column in row:
                    for color in column:
                        message_bits.append((color & 1) == 1)

            if frame_index == 1:
                frame_bits = message_bits

                width = self._bool2int(frame_bits[:16])
                height = self._bool2int(frame_bits[16:32])
                mult = self._bool2int(frame_bits[32:64])

                if height * width != mult:
                    raise Exception(
                        f"Invalid stegano video, no valid message found in {carrier}."
                    )

                message_length = mult * 8 * 3 + 64
                frame_height = frame.shape[0]
                frame_width = frame.shape[1]
                message_height = height
                message_width = width

            else:
                if (
                    frame_width * frame_height * frame_index  # type: ignore
                ) > message_length:
                    break

            frame_index += 1

        print("Extracting...")
        message_bytes = np.asarray(message_bits[64:message_length]).reshape((-1, 8))
        message_bytes = [self._bool2int(x) for x in message_bytes]
        message = np.asarray(message_bytes).reshape(
            (message_height, message_width, 3)  # type: ignore
        )
        print("Done. Hidden message was saved to extracted.png")
        self._remove_directory_content(self.tmp_frame)
        """
        cv2.imwrite(
            "./temp_out/extracted.png", message, [cv2.IMWRITE_PNG_COMPRESSION, 0]
        )
        """
        # io.video.write_image(output_image, message)
        shutil.rmtree(self.tmp_frame)
        return message

    def _required_frame_count(self, video_capture, image):
        vid_height = video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)  # type: ignore
        vid_width = video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)  # type: ignore

        # img_width, img_height = image.size
        img_height = image.shape[0]
        img_width = image.shape[1]

        # header length is 64
        return int(ceil((img_height * img_width * 8 + 64) / (vid_height * vid_width)))

    def _bin_rep(self, num):
        return "{0:b}".format(num)

    def _remove_directory_content(self, folder):
        for filename in os.listdir(folder):
            file_path = os.path.join(folder, filename)
            try:
                if os.path.isfile(file_path) or os.path.islink(file_path):
                    os.unlink(file_path)
                elif os.path.isdir(file_path):
                    shutil.rmtree(file_path)
            except Exception as e:
                print("Failed to delete %s. Reason: %s" % (file_path, e))

    def _make_dir(self, directory):
        pathlib.Path(directory).mkdir(parents=True, exist_ok=True)

    def _bool2int(self, x):
        x = x[::-1]
        y = 0
        for i, j in enumerate(x):
            y += j << i
        return y

encode_write_video(carrier, payload, output)

Simple LSB steganography using MP4 video.

Parameters:

Name Type Description Default
carrier VideoCapture

Input Carrier video in format mp4. Read with stegobox.io.video.read().

required
payload ndarray

Payload image PNG. Read with stegobox.io.image.read_cv2().

required
output str

Path to write the encoded video in format mp4.

required
Source code in stegobox/codec/mp4_hide_png.py
def encode_write_video(
    self, carrier: cv2.VideoCapture, payload: np.ndarray, output: str
) -> None:
    """Simple LSB steganography using MP4 video.

    Args:
        carrier: Input Carrier video in format mp4. Read with
            `stegobox.io.video.read()`.
        payload: Payload image PNG. Read with `stegobox.io.image.read_cv2()`.
        output: Path to write the encoded video in format mp4.
    """
    cap = carrier
    msg = payload

    vid_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    vid_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    vid_fps = cap.get(cv2.CAP_PROP_FPS)

    req_frame_count = self._required_frame_count(cap, msg)

    bin_rep_width = self._bin_rep(msg.shape[1]).zfill(16)
    bin_rep_height = self._bin_rep(msg.shape[0]).zfill(16)
    bin_rep_mult = self._bin_rep(msg.shape[0] * msg.shape[1]).zfill(32)

    image_bits = [
        int(x == "1") for x in bin_rep_width + bin_rep_height + bin_rep_mult
    ]

    for row in msg:
        for column in row:
            for color in column:
                for i in range(8):
                    image_bits.append(int((color & (1 << (7 - i))) != 0))

    video_colors = []

    for i in range(req_frame_count):
        ret, frame = cap.read()
        if not ret:
            raise Exception("Error in extracting video frames")
        for row in frame:
            for column in row:
                for color in range(3):
                    video_colors.append(column[color])

    assert len(video_colors) >= len(image_bits)
    for i in range(len(video_colors) - len(image_bits)):
        image_bits.append(0)

    video_colors = [(color & 0xFE) for color in video_colors]

    for i in range(len(video_colors)):
        video_colors[i] |= image_bits[i]

    video_colors = np.reshape(
        np.asarray(video_colors), (req_frame_count, vid_height, vid_width, 3)
    ).astype(np.uint8)

    # create dirs
    if os.path.exists(self.tmp_frame):
        self._remove_directory_content(self.tmp_frame)
    else:
        self._make_dir(self.tmp_frame)

    cnt = 1

    for frame in video_colors:
        cv2.imwrite(
            self.tmp_frame + "/frame{:09d}.png".format(cnt),
            frame,
            [cv2.IMWRITE_PNG_COMPRESSION, 0],
        )
        cnt += 1

    print("Starting write back...")
    while True:
        ret, frame = cap.read()

        if ret:
            cv2.imwrite(
                self.tmp_frame + "/frame{:09d}.png".format(cnt),
                frame,
                [cv2.IMWRITE_PNG_COMPRESSION, 0],
            )
            cnt += 1
        else:
            break

    temp_dir = output.split("/", 2)
    print(temp_dir[1])
    self._make_dir(temp_dir[1])

    """
    subprocess.call(
        [
            "ffmpeg",
            "-y",
            "-framerate",
            str(vid_fps),
            "-i",
            "./temp/frame%09d.png",
            "-c:v",
            "copy",
            "./temp_out/stegano.mp4",
        ]
    )
    """

    video = ffmpeg.input(self.tmp_frame + "/frame%09d.png", framerate=str(vid_fps))
    out = ffmpeg.output(
        video.video, output, acodec="copy", vcodec="copy"
    ).overwrite_output()
    out.run()

    cap.release()
    print(f"Done. Stegano video created and saved to {output}.")

decode(carrier)

Decode the secret image from the encoded video.

Parameters:

Name Type Description Default
carrier str

Path to the encoded video mp4 with the secret image embedded.

required

Returns:

Type Description
ndarray

The secret image in format png. Save with stegobox.io.image.write_cv2().

Source code in stegobox/codec/mp4_hide_png.py
def decode(self, carrier: str) -> np.ndarray:
    """Decode the secret image from the encoded video.

    Args:
        carrier: Path to the encoded video mp4 with the secret image embedded.

    Returns:
        The secret image in format png. Save with `stegobox.io.image.write_cv2()`.
    """
    if os.path.exists(self.tmp_frame):
        self._remove_directory_content(self.tmp_frame)
    else:
        self._make_dir(self.tmp_frame)

    # subprocess.call(["ffmpeg", "-y", "-i", carrier, "./temp/frame%09d.png"])

    video = ffmpeg.input(carrier)
    out = ffmpeg.output(
        video.video, self.tmp_frame + "/frame%09d.png"
    ).overwrite_output()
    out.run()
    frame_index = 1

    message_length = None
    message_width = None
    message_height = None
    frame_width = None
    frame_height = None

    message_bits = []

    while True:
        frame_path = self.tmp_frame + "/frame{:09d}.png".format(frame_index)
        if not os.path.exists(frame_path):
            print("ERROR:Not exist path")
            break

        frame = cv2.imread(frame_path)

        for row in frame:
            for column in row:
                for color in column:
                    message_bits.append((color & 1) == 1)

        if frame_index == 1:
            frame_bits = message_bits

            width = self._bool2int(frame_bits[:16])
            height = self._bool2int(frame_bits[16:32])
            mult = self._bool2int(frame_bits[32:64])

            if height * width != mult:
                raise Exception(
                    f"Invalid stegano video, no valid message found in {carrier}."
                )

            message_length = mult * 8 * 3 + 64
            frame_height = frame.shape[0]
            frame_width = frame.shape[1]
            message_height = height
            message_width = width

        else:
            if (
                frame_width * frame_height * frame_index  # type: ignore
            ) > message_length:
                break

        frame_index += 1

    print("Extracting...")
    message_bytes = np.asarray(message_bits[64:message_length]).reshape((-1, 8))
    message_bytes = [self._bool2int(x) for x in message_bytes]
    message = np.asarray(message_bytes).reshape(
        (message_height, message_width, 3)  # type: ignore
    )
    print("Done. Hidden message was saved to extracted.png")
    self._remove_directory_content(self.tmp_frame)
    """
    cv2.imwrite(
        "./temp_out/extracted.png", message, [cv2.IMWRITE_PNG_COMPRESSION, 0]
    )
    """
    # io.video.write_image(output_image, message)
    shutil.rmtree(self.tmp_frame)
    return message