Skip to content

CCVS

stegobox.codec.CCVS

Bases: BaseCodec

This steganography method is named CCVS (Caesar Cipher Video Steganography)

  • Created by: Jiayao Yang
  • Created time: 2022/11/9

CCVS is a simple program written in python to hide encrypted text using simple caesar cipher in video frames, the encryption algorithm could be easily changed because the cipher algorithm is loosly coupled.

Source code derived from: r9ht/Caesar-Cipher-Video-Steganography.

Source code in stegobox/codec/ccvs.py
class CCVS(BaseCodec):
    """
    This steganography method is named CCVS (Caesar Cipher Video Steganography)

    * Created by: Jiayao Yang
    * Created time: 2022/11/9

    CCVS is a simple program written in python to hide encrypted text using simple
    caesar cipher in video frames, the encryption algorithm could be easily changed
    because the cipher algorithm is loosly coupled.

    Source code derived from:
    [r9ht/Caesar-Cipher-Video-Steganography](https://github.com/r9ht/Caesar-Cipher-Video-Steganography).
    """

    def __init__(self, tmp_frame: str = "./temp", caesarn: int = 5) -> None:
        super().__init__()
        self.tmp_frame = tmp_frame
        self.caesarn = caesarn

    def encode(self, _):
        raise NotImplementedError("Use method encode_write_video instead")

    def decode(self, _):
        raise NotImplementedError("Use method decode_write_txt instead")

    def _split2len(self, s, n):
        def _f(s, n):
            while s:
                yield s[:n]
                s = s[n:]

        return list(_f(s, n))

    def _frame_extract(self, video):
        if not os.path.exists(self.tmp_frame):
            os.makedirs(self.tmp_frame)
        temp_folder = self.tmp_frame
        print("temp directory is created")

        count = 0
        vidcap = video

        while True:
            success, image = vidcap.read()
            if not success:
                break
            cv2.imwrite(os.path.join(temp_folder, "{:d}.png".format(count)), image)
            count += 1

    def _caesar_ascii(self, char, mode, n) -> str:
        if mode == "enc":
            ascii = ord(char)
            return chr((ascii + n) % 128)
        elif mode == "dec":
            ascii = ord(char)
            return chr((ascii - n) % 128)
        else:
            return "error"

    def _encode_frame(self, frame_dir, text_to_hide, caesarn):

        # open the text file

        text_to_hide_open = open(text_to_hide, "r")
        text_to_hide = repr(text_to_hide_open.read())

        # split text to max 255 char each

        text_to_hide_chopped = self._split2len(text_to_hide, 255)

        for text in text_to_hide_chopped:
            length = len(text)
            chopped_text_index = text_to_hide_chopped.index(text)
            frame = Image.open(frame_dir + "/" + str(chopped_text_index + 1) + ".png")

            if frame.mode != "RGB":
                raise TypeError("Source frame must be in RGB format")

            # use copy of the file

            encoded = frame.copy()
            width, height = frame.size

            index = 0
            # a = object
            for row in range(height):
                for col in range(width):
                    r, g, b = frame.getpixel((col, row))

                    # first value is length of the message per frame
                    if row == 0 and col == 0 and index < length:
                        asc = length
                        if text_to_hide_chopped.index(text) == 0:
                            total_encoded_frame = len(text_to_hide_chopped)
                        else:
                            total_encoded_frame = g
                    elif index <= length:
                        c = text[index - 1]
                        # put the encypted character into ascii value
                        asc = ord(self._caesar_ascii(c, "enc", caesarn))
                        total_encoded_frame = g
                    else:
                        asc = r
                        total_encoded_frame = g
                    encoded.putpixel((col, row), (asc, total_encoded_frame, b))
                    index += 1
            if encoded:
                encoded.save(
                    str(frame_dir) + "/" + str(chopped_text_index + 1) + ".png",
                    compress_level=0,
                )

    def _decode_frame(self, frame_dir, caesarn, output):

        # take the first frame to get width, height, and total encoded frame

        # first_frame = Image.open(str(frame_dir) + "/0.jpg")
        first_frame = Image.open(str(frame_dir) + "/" + "1.png")
        r, g, b = first_frame.getpixel((0, 0))
        total_encoded_frame = g
        msg = ""
        for i in range(1, total_encoded_frame + 1):
            frame = Image.open(str(frame_dir) + "/" + str(i) + ".png")
            width, height = frame.size
            index = 0
            for row in range(height):
                for col in range(width):
                    try:
                        r, g, b = frame.getpixel((col, row))
                    except ValueError:

                        # for some ong a(transparancy) is needed
                        r, g, b, a = frame.getpixel((col, row))
                    if row == 0 and col == 0:
                        length = r
                    elif index <= length:  # type: ignore
                        # put the decrypted character into string
                        msg += self._caesar_ascii(chr(r), "dec", caesarn)
                    index += 1
        # remove the first and the last quote
        msg = msg[1:-1]
        recovered_txt = open(output, "w")
        # recovered_txt.write(str(msg.decode("string_escape")))
        recovered_txt.write(
            str(
                msg.encode("latin1")
                .decode("unicode-escape")
                .encode("latin1")
                .decode("utf-8")
            )
        )
        # recovered_txt.write(msg.encode().decode("unicode_escape", "surrogatepass"))
        # recovered_txt.write(msg)
        if os.path.isfile(self.tmp_frame):
            os.remove(self.tmp_frame)  # remove the file
        elif os.path.isdir(self.tmp_frame):
            shutil.rmtree(self.tmp_frame)  # remove dir and all contains
        else:
            raise ValueError("file {} is not a file or dir.".format(self.tmp_frame))

    def encode_write_video(
        self, carrier: cv2.VideoCapture, payload: str, output: str
    ) -> None:
        """Caesar-Cipher-Video-Steganography.

        Args:
            carrier: Carrier video in format mp4. Read with `stegobox.io.video.read()`.
            payload: Secret payload message in format txt.
            output: Method will write the encoded video to this path.
        """
        self._frame_extract(carrier)

        # using system call
        # ffmpeg -i data/chef.mp4 -q:a 0 -map a temp/audio.mp3 -y
        # 2>/dev/null for supressing the output from ffmpeg
        """
        call(
            [
                "ffmpeg",
                "-i",
                "./data/video/v1.mp4",
                "-q:a",
                "0",
                "-map",
                "a",
                self.tmp_frame + "/audio.mp3",
                "-y",
            ],
            stdout=open(os.devnull, "w"),
            stderr=STDOUT,
        )
        """
        video = ffmpeg.input("./data/video/v1.mp4")
        out = ffmpeg.output(
            video,
            self.tmp_frame + "/audio.mp3",
            **{"qscale:a": 0},
            **{"map": "a"},
        ).overwrite_output()
        out.run()

        self._encode_frame(self.tmp_frame, payload, self.caesarn)

        # ffmpeg -i temp/%d.png -vcodec png data/enc-filename.mov
        """
        call(
            [
                "ffmpeg",
                "-i",
                self.tmp_frame + "/%d.png",
                "-vcodec",
                "png",
                self.tmp_frame + "/video.mov",
                "-y",
            ],
            stdout=open(os.devnull, "w"),
            stderr=STDOUT,
        )
        """
        video = ffmpeg.input(self.tmp_frame + "/%d.png")
        out = ffmpeg.output(
            video.video, self.tmp_frame + "/video.mov", acodec="copy", vcodec="png"
        ).overwrite_output()
        out.run()

        # ffmpeg -i temp/temp-video.avi -i temp/audio.mp3
        # -codec copy data/enc-chef.mp4 -y
        """
        call(
            [
                "ffmpeg",
                "-i",
                self.tmp_frame + "/video.mov",
                "-i",
                self.tmp_frame + "/audio.mp3",
                "-codec",
                "copy",
                str(output),
                "-y",
            ],
            stdout=open(os.devnull, "w"),
            stderr=STDOUT,
        )
        """
        video = ffmpeg.input(self.tmp_frame + "/video.mov")
        audio = ffmpeg.input(self.tmp_frame + "/audio.mp3")
        out = ffmpeg.output(
            audio.audio, video.video, str(output), codec="copy"
        ).overwrite_output()
        out.run()
        print("(!) Success , output : " + str(output))

    def decode_write_txt(self, carrier: cv2.VideoCapture, output_txt: str):
        """Decode the secret from the encoded video.

        Args:
            carrier: The encoded video mp4. Read with `stegobox.io.video.read().
            output_txt: Method will write the secret message to this path.
        """
        self._frame_extract(carrier)
        self._decode_frame(self.tmp_frame, self.caesarn, output_txt)

encode_write_video(carrier, payload, output)

Caesar-Cipher-Video-Steganography.

Parameters:

Name Type Description Default
carrier VideoCapture

Carrier video in format mp4. Read with stegobox.io.video.read().

required
payload str

Secret payload message in format txt.

required
output str

Method will write the encoded video to this path.

required
Source code in stegobox/codec/ccvs.py
def encode_write_video(
    self, carrier: cv2.VideoCapture, payload: str, output: str
) -> None:
    """Caesar-Cipher-Video-Steganography.

    Args:
        carrier: Carrier video in format mp4. Read with `stegobox.io.video.read()`.
        payload: Secret payload message in format txt.
        output: Method will write the encoded video to this path.
    """
    self._frame_extract(carrier)

    # using system call
    # ffmpeg -i data/chef.mp4 -q:a 0 -map a temp/audio.mp3 -y
    # 2>/dev/null for supressing the output from ffmpeg
    """
    call(
        [
            "ffmpeg",
            "-i",
            "./data/video/v1.mp4",
            "-q:a",
            "0",
            "-map",
            "a",
            self.tmp_frame + "/audio.mp3",
            "-y",
        ],
        stdout=open(os.devnull, "w"),
        stderr=STDOUT,
    )
    """
    video = ffmpeg.input("./data/video/v1.mp4")
    out = ffmpeg.output(
        video,
        self.tmp_frame + "/audio.mp3",
        **{"qscale:a": 0},
        **{"map": "a"},
    ).overwrite_output()
    out.run()

    self._encode_frame(self.tmp_frame, payload, self.caesarn)

    # ffmpeg -i temp/%d.png -vcodec png data/enc-filename.mov
    """
    call(
        [
            "ffmpeg",
            "-i",
            self.tmp_frame + "/%d.png",
            "-vcodec",
            "png",
            self.tmp_frame + "/video.mov",
            "-y",
        ],
        stdout=open(os.devnull, "w"),
        stderr=STDOUT,
    )
    """
    video = ffmpeg.input(self.tmp_frame + "/%d.png")
    out = ffmpeg.output(
        video.video, self.tmp_frame + "/video.mov", acodec="copy", vcodec="png"
    ).overwrite_output()
    out.run()

    # ffmpeg -i temp/temp-video.avi -i temp/audio.mp3
    # -codec copy data/enc-chef.mp4 -y
    """
    call(
        [
            "ffmpeg",
            "-i",
            self.tmp_frame + "/video.mov",
            "-i",
            self.tmp_frame + "/audio.mp3",
            "-codec",
            "copy",
            str(output),
            "-y",
        ],
        stdout=open(os.devnull, "w"),
        stderr=STDOUT,
    )
    """
    video = ffmpeg.input(self.tmp_frame + "/video.mov")
    audio = ffmpeg.input(self.tmp_frame + "/audio.mp3")
    out = ffmpeg.output(
        audio.audio, video.video, str(output), codec="copy"
    ).overwrite_output()
    out.run()
    print("(!) Success , output : " + str(output))

decode_write_txt(carrier, output_txt)

Decode the secret from the encoded video.

Parameters:

Name Type Description Default
carrier VideoCapture

The encoded video mp4. Read with `stegobox.io.video.read().

required
output_txt str

Method will write the secret message to this path.

required
Source code in stegobox/codec/ccvs.py
def decode_write_txt(self, carrier: cv2.VideoCapture, output_txt: str):
    """Decode the secret from the encoded video.

    Args:
        carrier: The encoded video mp4. Read with `stegobox.io.video.read().
        output_txt: Method will write the secret message to this path.
    """
    self._frame_extract(carrier)
    self._decode_frame(self.tmp_frame, self.caesarn, output_txt)