Skip to content

VideoHideVideoLsb

stegobox.codec.VideoHideVideoLsb

Bases: BaseCodec

  • To encode: The carry video is divided into video part and audio part, and the payload video is also devide into video part and audio part. For audio part, use simple audio LSB (4-bits) replacement steganography in WAV audios. For video part, divide into frames (PNG image), and use LSB (2-bits) PNG images steganography. Note: two copies of a frame are made, the first to store the 1st and 2nd bits of the payload, and the second to store the 3rd and 4th bits of the payload. So the frame number of stego video is twice the number of carry video.

  • To decode: The carry video is divided into video part and audio part. For audio part, the low-four significant bits of the audio part are read off until the delimiter is hit. For video part, the 1st and 2nd bits are from odd frames, 3rd and 4th bits are extracted from every frames.

Source: kelvins/steganography

Source code in stegobox/codec/videohidevideo_lsb.py
class VideoHideVideoLsb(BaseCodec):
    """
    * To encode: The carry video is divided into video part and audio part, and the
      payload video is also devide into video part and audio part. For audio part, use
      simple audio LSB (4-bits) replacement steganography in WAV audios. For video part,
      divide into frames (PNG image), and use LSB (2-bits) PNG images steganography.
      Note: two copies of a frame are made, the first to store the 1st and 2nd bits of
      the payload, and the second to store the 3rd and 4th bits of the payload. So the
      frame number of stego video is twice the number of carry video.

    * To decode: The carry video is divided into video part and audio part. For audio
      part, the low-four significant bits of the audio part are read off until the
      delimiter is hit. For video part, the 1st and 2nd bits are from odd frames, 3rd
      and 4th bits are extracted from every frames.

    Source: [kelvins/steganography](https://github.com/kelvins/steganography)
    """

    def __init__(self) -> None:
        """Hide video in video with LSB."""
        self.fn = 0

    def _resize(
        self, src: np.ndarray, w: int = 0, h: int = 0, ar: float = 0
    ) -> np.ndarray:
        """Resize is used to adjusted video's width and height.

        Args:
            src: The video to be adjusted.
            w: the video's width.
            h: the video's height.
            ar: the video's proportion.

        Returns:
            The video after being adjusted.
        """
        temp = cv2.resize(src, (w, h))
        if w != 0 and h != 0:
            temp = cv2.resize(src, (w, h))
        assert ar != 0
        if w != 0:
            temp = cv2.resize(src, (w, int(w / ar)))
        if h != 0:
            temp = cv2.resize(src, (int(h * ar), h))
        return temp

    def encode(self, _):
        raise NotImplementedError("Use encode_write_video() instead.")

    def encode_write_video(
        self,
        carrier: tuple[cv2.VideoCapture, ffmpeg.nodes.FilterableStream],
        payload: tuple[cv2.VideoCapture, ffmpeg.nodes.FilterableStream],
        output_path: str,
    ) -> None:
        """Encode requires the carrier video in format (mp4|avi|mkv) and the payload to
        be a video in format (mp4|avi|mkv), the time of carrier must greater than or
        equal to the time of payload.

        Args:
            carrier: Carrier video in format (mp4|avi|mkv). Read with
                `stegobox.io.video.read()`.
            payload: Payload (secret video) to be encoded in format(mp4|avi|mkv). Read
                with `stegobox.io.video.read()`.
            output_path: Path to save the encoded steganographic video.
        """

        cover_cv, cover_ffmpeg = carrier
        sec_cv, sec_ffmpeg = payload
        self.fn = 0

        src = cover_cv
        src_w = int(src.get(3))
        src_h = int(src.get(4))
        src_fps = src.get(cv2.CAP_PROP_FPS)
        src_frame_cnt = src.get(cv2.CAP_PROP_FRAME_COUNT)

        sec = sec_cv
        sec_w = int(sec.get(3))
        sec_h = int(sec.get(4))

        sec_frame_cnt = sec.get(cv2.CAP_PROP_FRAME_COUNT)

        if src_frame_cnt < sec_frame_cnt:
            raise ValueError("Carrier video must be longer than or equal to payload.")

        # working with audio
        # sec_duration = sec_frame_cnt / sec_fps
        with tempfile.TemporaryDirectory() as tempfile_path:
            (
                ffmpeg.output(cover_ffmpeg.audio, f"{tempfile_path}/cover.wav")
                .overwrite_output()
                .run()
            )

            (
                ffmpeg.output(sec_ffmpeg.audio, f"{tempfile_path}/sec.wav")
                .overwrite_output()
                .run()
            )

            with wave.open(f"{tempfile_path}/temp_stego_audio.wav", "wb") as e:
                s = wave.open(f"{tempfile_path}/sec.wav", "rb")
                c = wave.open(f"{tempfile_path}/cover.wav", "rb")
                s_frames = np.array(list(s.readframes(s.getnframes())), dtype="uint8")
                c_frames = np.array(list(c.readframes(c.getnframes())), dtype="uint8")
                # make the shape of frames same
                if s_frames.shape[0] > c_frames.shape[0]:
                    c_frames = np.concatenate(
                        (
                            c_frames,
                            np.zeros(
                                (s_frames.shape[0] - c_frames.shape[0],), dtype="uint8"
                            ),
                        ),
                        axis=0,
                    )
                elif s_frames.shape[0] < c_frames.shape[0]:
                    s_frames = np.concatenate(
                        (
                            s_frames,
                            np.zeros(
                                (c_frames.shape[0] - s_frames.shape[0],), dtype="uint8"
                            ),
                        ),
                        axis=0,
                    )

                # encryption of audio
                enc_frames = (c_frames & 0b11110000) | (s_frames & 0b11110000) >> 4

                e.setparams(s.getparams())
                e.writeframes(np.ndarray.tobytes(enc_frames))

                s.close()
                c.close()

            with Progress() as progress:
                task1 = progress.add_task("frames", total=sec_frame_cnt * 2)
                while 1:
                    _, src_frame = src.read()
                    ret, sec_frame = sec.read()

                    if not ret:
                        break

                    # get aspect ratio
                    src_ar = src_w / src_h
                    sec_ar = sec_w / sec_h

                    if src_ar == sec_ar and src_frame.shape < sec_frame.shape:
                        sec_frame = self._resize(sec_frame, src_w, src_h)
                    elif src_ar != sec_ar and (src_w < sec_w or src_h < sec_h):
                        if sec_w > sec_h:
                            sec_frame = self._resize(sec_frame, w=src_w, ar=sec_ar)
                            if sec_frame.shape[0] > src_h:
                                sec_frame = self._resize(sec_frame, h=src_h, ar=sec_ar)
                        else:
                            sec_frame = self._resize(sec_frame, h=src_h, ar=sec_ar)
                            if sec_frame.shape[1] > src_w:
                                sec_frame = self._resize(sec_frame, w=src_w, ar=sec_ar)

                    # -- fill the remaining pixel with black color -- #
                    sec_frame = cv2.hconcat(
                        [
                            sec_frame,
                            np.zeros(
                                (sec_frame.shape[0], src_w - sec_frame.shape[1], 3),
                                dtype="uint8",
                            ),
                        ]  # type: ignore
                    )
                    sec_frame = cv2.vconcat(
                        [
                            sec_frame,
                            np.zeros(
                                (src_h - sec_frame.shape[0], sec_frame.shape[1], 3),
                                dtype="uint8",
                            ),
                        ]  # type: ignore
                    )

                    # -- encryption for LSB 2 bits -- #
                    encrypted_img = (src_frame & 0b11111100) | (
                        sec_frame >> 4 & 0b00000011
                    )
                    self.fn = self.fn + 1

                    cv2.imwrite(
                        "{}/{}.png".format(f"{tempfile_path}", self.fn), encrypted_img
                    )
                    # -- encryption for 3rd and 4th bits from LSB side-- #
                    encrypted_img = (src_frame & 0b11111100) | (sec_frame >> 6)
                    self.fn = self.fn + 1
                    cv2.imwrite(
                        "{}/{}.png".format(f"{tempfile_path}", self.fn), encrypted_img
                    )
                    # cv2.imwrite("{}/{}.png".format('data/file', self.fn),tempframe)
                    progress.update(task1, advance=2)

            src.release()
            sec.release()

            # save the video using ffmpeg as a lossless video
            # frame rate is doubled to preserve the speed of cover video
            ffmpeg.input(f"{tempfile_path}/%d.png", framerate=src_fps * 2).output(
                f"{tempfile_path}/temp_stego_video.avi", vcodec="copy"
            ).overwrite_output().run()

            video = ffmpeg.input(f"{tempfile_path}/temp_stego_video.avi")
            audio = ffmpeg.input(f"{tempfile_path}/temp_stego_audio.wav")

            out = ffmpeg.output(
                audio.audio, video.video, output_path, acodec="copy", vcodec="copy"
            ).overwrite_output()
            out.run()

    def decode(self, _):
        raise NotImplementedError("Use decode_write_video() instead.")

    def decode_write_video(
        self,
        carrier: tuple[cv2.VideoCapture, ffmpeg.nodes.FilterableStream],
        output_path: str,
    ) -> None:
        """Decode the secret payload from the carrier video.

        Args:
            carrier: Carrier video in format (mp4|avi|mkv). Read with
                `stegobox.io.video.read()`.
            output_path: The path to save the extract video.
        """

        video_cv, video_ffmpeg = carrier
        enc = video_cv
        enc_w = int(enc.get(3))
        enc_h = int(enc.get(4))
        enc_fps = enc.get(cv2.CAP_PROP_FPS)
        enc_frame_cnt = enc.get(cv2.CAP_PROP_FRAME_COUNT)

        with tempfile.TemporaryDirectory() as tempfile_path:
            out = cv2.VideoWriter(
                f"{tempfile_path}/extract_video.avi",
                cv2.VideoWriter_fourcc(*"MJPG"),
                enc_fps / 2,
                (enc_w, enc_h),
            )

            ffmpeg.output(
                video_ffmpeg.audio, f"{tempfile_path}/enc.wav", acodec="copy"
            ).overwrite_output().run()

            # decoding audio file
            with wave.open(f"{tempfile_path}/extract_audio.wav", "wb") as d:
                e = wave.open(f"{tempfile_path}/enc.wav", "rb")
                e_frames = np.array(list(e.readframes(e.getnframes())), dtype="uint8")

                # decryption of audio
                dec_frames = (e_frames & 0b00001111) << 4

                d.setparams(e.getparams())
                d.writeframes(np.ndarray.tobytes(dec_frames))

                e.close()

            # frame number

            fn = 0

            with Progress() as progress:
                task = progress.add_task("frames", total=enc_frame_cnt)

                while 1:
                    # let's take the decrypted image
                    ret, frame = enc.read()

                    if not ret:
                        break

                    fn = fn + 1

                    # for odd frames, 1st and 2nd bits are extracted
                    # for even frames, 3rd and 4th bits are extracted
                    if fn % 2:
                        dec_frame = (frame & 0b00000011) << 4
                    else:
                        temp1 = (frame & 0b00000011) << 6
                        dec_frame = dec_frame | temp1  # type: ignore

                        out.write(dec_frame)

                    progress.update(task, advance=1)

            enc.release()
            out.release()
            video = ffmpeg.input(f"{tempfile_path}/extract_video.avi")
            audio = ffmpeg.input(f"{tempfile_path}/extract_audio.wav")

            out = ffmpeg.output(
                audio.audio, video.video, output_path, acodec="copy", vcodec="copy"
            ).overwrite_output()
            out.run()

__init__()

Hide video in video with LSB.

Source code in stegobox/codec/videohidevideo_lsb.py
def __init__(self) -> None:
    """Hide video in video with LSB."""
    self.fn = 0

encode_write_video(carrier, payload, output_path)

Encode requires the carrier video in format (mp4|avi|mkv) and the payload to be a video in format (mp4|avi|mkv), the time of carrier must greater than or equal to the time of payload.

Parameters:

Name Type Description Default
carrier tuple[VideoCapture, FilterableStream]

Carrier video in format (mp4|avi|mkv). Read with stegobox.io.video.read().

required
payload tuple[VideoCapture, FilterableStream]

Payload (secret video) to be encoded in format(mp4|avi|mkv). Read with stegobox.io.video.read().

required
output_path str

Path to save the encoded steganographic video.

required
Source code in stegobox/codec/videohidevideo_lsb.py
def encode_write_video(
    self,
    carrier: tuple[cv2.VideoCapture, ffmpeg.nodes.FilterableStream],
    payload: tuple[cv2.VideoCapture, ffmpeg.nodes.FilterableStream],
    output_path: str,
) -> None:
    """Encode requires the carrier video in format (mp4|avi|mkv) and the payload to
    be a video in format (mp4|avi|mkv), the time of carrier must greater than or
    equal to the time of payload.

    Args:
        carrier: Carrier video in format (mp4|avi|mkv). Read with
            `stegobox.io.video.read()`.
        payload: Payload (secret video) to be encoded in format(mp4|avi|mkv). Read
            with `stegobox.io.video.read()`.
        output_path: Path to save the encoded steganographic video.
    """

    cover_cv, cover_ffmpeg = carrier
    sec_cv, sec_ffmpeg = payload
    self.fn = 0

    src = cover_cv
    src_w = int(src.get(3))
    src_h = int(src.get(4))
    src_fps = src.get(cv2.CAP_PROP_FPS)
    src_frame_cnt = src.get(cv2.CAP_PROP_FRAME_COUNT)

    sec = sec_cv
    sec_w = int(sec.get(3))
    sec_h = int(sec.get(4))

    sec_frame_cnt = sec.get(cv2.CAP_PROP_FRAME_COUNT)

    if src_frame_cnt < sec_frame_cnt:
        raise ValueError("Carrier video must be longer than or equal to payload.")

    # working with audio
    # sec_duration = sec_frame_cnt / sec_fps
    with tempfile.TemporaryDirectory() as tempfile_path:
        (
            ffmpeg.output(cover_ffmpeg.audio, f"{tempfile_path}/cover.wav")
            .overwrite_output()
            .run()
        )

        (
            ffmpeg.output(sec_ffmpeg.audio, f"{tempfile_path}/sec.wav")
            .overwrite_output()
            .run()
        )

        with wave.open(f"{tempfile_path}/temp_stego_audio.wav", "wb") as e:
            s = wave.open(f"{tempfile_path}/sec.wav", "rb")
            c = wave.open(f"{tempfile_path}/cover.wav", "rb")
            s_frames = np.array(list(s.readframes(s.getnframes())), dtype="uint8")
            c_frames = np.array(list(c.readframes(c.getnframes())), dtype="uint8")
            # make the shape of frames same
            if s_frames.shape[0] > c_frames.shape[0]:
                c_frames = np.concatenate(
                    (
                        c_frames,
                        np.zeros(
                            (s_frames.shape[0] - c_frames.shape[0],), dtype="uint8"
                        ),
                    ),
                    axis=0,
                )
            elif s_frames.shape[0] < c_frames.shape[0]:
                s_frames = np.concatenate(
                    (
                        s_frames,
                        np.zeros(
                            (c_frames.shape[0] - s_frames.shape[0],), dtype="uint8"
                        ),
                    ),
                    axis=0,
                )

            # encryption of audio
            enc_frames = (c_frames & 0b11110000) | (s_frames & 0b11110000) >> 4

            e.setparams(s.getparams())
            e.writeframes(np.ndarray.tobytes(enc_frames))

            s.close()
            c.close()

        with Progress() as progress:
            task1 = progress.add_task("frames", total=sec_frame_cnt * 2)
            while 1:
                _, src_frame = src.read()
                ret, sec_frame = sec.read()

                if not ret:
                    break

                # get aspect ratio
                src_ar = src_w / src_h
                sec_ar = sec_w / sec_h

                if src_ar == sec_ar and src_frame.shape < sec_frame.shape:
                    sec_frame = self._resize(sec_frame, src_w, src_h)
                elif src_ar != sec_ar and (src_w < sec_w or src_h < sec_h):
                    if sec_w > sec_h:
                        sec_frame = self._resize(sec_frame, w=src_w, ar=sec_ar)
                        if sec_frame.shape[0] > src_h:
                            sec_frame = self._resize(sec_frame, h=src_h, ar=sec_ar)
                    else:
                        sec_frame = self._resize(sec_frame, h=src_h, ar=sec_ar)
                        if sec_frame.shape[1] > src_w:
                            sec_frame = self._resize(sec_frame, w=src_w, ar=sec_ar)

                # -- fill the remaining pixel with black color -- #
                sec_frame = cv2.hconcat(
                    [
                        sec_frame,
                        np.zeros(
                            (sec_frame.shape[0], src_w - sec_frame.shape[1], 3),
                            dtype="uint8",
                        ),
                    ]  # type: ignore
                )
                sec_frame = cv2.vconcat(
                    [
                        sec_frame,
                        np.zeros(
                            (src_h - sec_frame.shape[0], sec_frame.shape[1], 3),
                            dtype="uint8",
                        ),
                    ]  # type: ignore
                )

                # -- encryption for LSB 2 bits -- #
                encrypted_img = (src_frame & 0b11111100) | (
                    sec_frame >> 4 & 0b00000011
                )
                self.fn = self.fn + 1

                cv2.imwrite(
                    "{}/{}.png".format(f"{tempfile_path}", self.fn), encrypted_img
                )
                # -- encryption for 3rd and 4th bits from LSB side-- #
                encrypted_img = (src_frame & 0b11111100) | (sec_frame >> 6)
                self.fn = self.fn + 1
                cv2.imwrite(
                    "{}/{}.png".format(f"{tempfile_path}", self.fn), encrypted_img
                )
                # cv2.imwrite("{}/{}.png".format('data/file', self.fn),tempframe)
                progress.update(task1, advance=2)

        src.release()
        sec.release()

        # save the video using ffmpeg as a lossless video
        # frame rate is doubled to preserve the speed of cover video
        ffmpeg.input(f"{tempfile_path}/%d.png", framerate=src_fps * 2).output(
            f"{tempfile_path}/temp_stego_video.avi", vcodec="copy"
        ).overwrite_output().run()

        video = ffmpeg.input(f"{tempfile_path}/temp_stego_video.avi")
        audio = ffmpeg.input(f"{tempfile_path}/temp_stego_audio.wav")

        out = ffmpeg.output(
            audio.audio, video.video, output_path, acodec="copy", vcodec="copy"
        ).overwrite_output()
        out.run()

decode_write_video(carrier, output_path)

Decode the secret payload from the carrier video.

Parameters:

Name Type Description Default
carrier tuple[VideoCapture, FilterableStream]

Carrier video in format (mp4|avi|mkv). Read with stegobox.io.video.read().

required
output_path str

The path to save the extract video.

required
Source code in stegobox/codec/videohidevideo_lsb.py
def decode_write_video(
    self,
    carrier: tuple[cv2.VideoCapture, ffmpeg.nodes.FilterableStream],
    output_path: str,
) -> None:
    """Decode the secret payload from the carrier video.

    Args:
        carrier: Carrier video in format (mp4|avi|mkv). Read with
            `stegobox.io.video.read()`.
        output_path: The path to save the extract video.
    """

    video_cv, video_ffmpeg = carrier
    enc = video_cv
    enc_w = int(enc.get(3))
    enc_h = int(enc.get(4))
    enc_fps = enc.get(cv2.CAP_PROP_FPS)
    enc_frame_cnt = enc.get(cv2.CAP_PROP_FRAME_COUNT)

    with tempfile.TemporaryDirectory() as tempfile_path:
        out = cv2.VideoWriter(
            f"{tempfile_path}/extract_video.avi",
            cv2.VideoWriter_fourcc(*"MJPG"),
            enc_fps / 2,
            (enc_w, enc_h),
        )

        ffmpeg.output(
            video_ffmpeg.audio, f"{tempfile_path}/enc.wav", acodec="copy"
        ).overwrite_output().run()

        # decoding audio file
        with wave.open(f"{tempfile_path}/extract_audio.wav", "wb") as d:
            e = wave.open(f"{tempfile_path}/enc.wav", "rb")
            e_frames = np.array(list(e.readframes(e.getnframes())), dtype="uint8")

            # decryption of audio
            dec_frames = (e_frames & 0b00001111) << 4

            d.setparams(e.getparams())
            d.writeframes(np.ndarray.tobytes(dec_frames))

            e.close()

        # frame number

        fn = 0

        with Progress() as progress:
            task = progress.add_task("frames", total=enc_frame_cnt)

            while 1:
                # let's take the decrypted image
                ret, frame = enc.read()

                if not ret:
                    break

                fn = fn + 1

                # for odd frames, 1st and 2nd bits are extracted
                # for even frames, 3rd and 4th bits are extracted
                if fn % 2:
                    dec_frame = (frame & 0b00000011) << 4
                else:
                    temp1 = (frame & 0b00000011) << 6
                    dec_frame = dec_frame | temp1  # type: ignore

                    out.write(dec_frame)

                progress.update(task, advance=1)

        enc.release()
        out.release()
        video = ffmpeg.input(f"{tempfile_path}/extract_video.avi")
        audio = ffmpeg.input(f"{tempfile_path}/extract_audio.wav")

        out = ffmpeg.output(
            audio.audio, video.video, output_path, acodec="copy", vcodec="copy"
        ).overwrite_output()
        out.run()