class CCVS(BaseCodec):
"""
This steganography method is named CCVS (Caesar Cipher Video Steganography)
* Created by: Jiayao Yang
* Created time: 2022/11/9
CCVS is a simple program written in python to hide encrypted text using simple
caesar cipher in video frames, the encryption algorithm could be easily changed
because the cipher algorithm is loosly coupled.
Source code derived from:
[r9ht/Caesar-Cipher-Video-Steganography](https://github.com/r9ht/Caesar-Cipher-Video-Steganography).
"""
def __init__(self, tmp_frame: str = "./temp", caesarn: int = 5) -> None:
super().__init__()
self.tmp_frame = tmp_frame
self.caesarn = caesarn
def encode(self, _):
raise NotImplementedError("Use method encode_write_video instead")
def decode(self, _):
raise NotImplementedError("Use method decode_write_txt instead")
def _split2len(self, s, n):
def _f(s, n):
while s:
yield s[:n]
s = s[n:]
return list(_f(s, n))
def _frame_extract(self, video):
if not os.path.exists(self.tmp_frame):
os.makedirs(self.tmp_frame)
temp_folder = self.tmp_frame
print("temp directory is created")
count = 0
vidcap = video
while True:
success, image = vidcap.read()
if not success:
break
cv2.imwrite(os.path.join(temp_folder, "{:d}.png".format(count)), image)
count += 1
def _caesar_ascii(self, char, mode, n) -> str:
if mode == "enc":
ascii = ord(char)
return chr((ascii + n) % 128)
elif mode == "dec":
ascii = ord(char)
return chr((ascii - n) % 128)
else:
return "error"
def _encode_frame(self, frame_dir, text_to_hide, caesarn):
# open the text file
text_to_hide_open = open(text_to_hide, "r")
text_to_hide = repr(text_to_hide_open.read())
# split text to max 255 char each
text_to_hide_chopped = self._split2len(text_to_hide, 255)
for text in text_to_hide_chopped:
length = len(text)
chopped_text_index = text_to_hide_chopped.index(text)
frame = Image.open(frame_dir + "/" + str(chopped_text_index + 1) + ".png")
if frame.mode != "RGB":
raise TypeError("Source frame must be in RGB format")
# use copy of the file
encoded = frame.copy()
width, height = frame.size
index = 0
# a = object
for row in range(height):
for col in range(width):
r, g, b = frame.getpixel((col, row))
# first value is length of the message per frame
if row == 0 and col == 0 and index < length:
asc = length
if text_to_hide_chopped.index(text) == 0:
total_encoded_frame = len(text_to_hide_chopped)
else:
total_encoded_frame = g
elif index <= length:
c = text[index - 1]
# put the encypted character into ascii value
asc = ord(self._caesar_ascii(c, "enc", caesarn))
total_encoded_frame = g
else:
asc = r
total_encoded_frame = g
encoded.putpixel((col, row), (asc, total_encoded_frame, b))
index += 1
if encoded:
encoded.save(
str(frame_dir) + "/" + str(chopped_text_index + 1) + ".png",
compress_level=0,
)
def _decode_frame(self, frame_dir, caesarn, output):
# take the first frame to get width, height, and total encoded frame
# first_frame = Image.open(str(frame_dir) + "/0.jpg")
first_frame = Image.open(str(frame_dir) + "/" + "1.png")
r, g, b = first_frame.getpixel((0, 0))
total_encoded_frame = g
msg = ""
for i in range(1, total_encoded_frame + 1):
frame = Image.open(str(frame_dir) + "/" + str(i) + ".png")
width, height = frame.size
index = 0
for row in range(height):
for col in range(width):
try:
r, g, b = frame.getpixel((col, row))
except ValueError:
# for some ong a(transparancy) is needed
r, g, b, a = frame.getpixel((col, row))
if row == 0 and col == 0:
length = r
elif index <= length: # type: ignore
# put the decrypted character into string
msg += self._caesar_ascii(chr(r), "dec", caesarn)
index += 1
# remove the first and the last quote
msg = msg[1:-1]
recovered_txt = open(output, "w")
# recovered_txt.write(str(msg.decode("string_escape")))
recovered_txt.write(
str(
msg.encode("latin1")
.decode("unicode-escape")
.encode("latin1")
.decode("utf-8")
)
)
# recovered_txt.write(msg.encode().decode("unicode_escape", "surrogatepass"))
# recovered_txt.write(msg)
if os.path.isfile(self.tmp_frame):
os.remove(self.tmp_frame) # remove the file
elif os.path.isdir(self.tmp_frame):
shutil.rmtree(self.tmp_frame) # remove dir and all contains
else:
raise ValueError("file {} is not a file or dir.".format(self.tmp_frame))
def encode_write_video(
self, carrier: cv2.VideoCapture, payload: str, output: str
) -> None:
"""Caesar-Cipher-Video-Steganography.
Args:
carrier: Carrier video in format mp4. Read with `stegobox.io.video.read()`.
payload: Secret payload message in format txt.
output: Method will write the encoded video to this path.
"""
self._frame_extract(carrier)
# using system call
# ffmpeg -i data/chef.mp4 -q:a 0 -map a temp/audio.mp3 -y
# 2>/dev/null for supressing the output from ffmpeg
"""
call(
[
"ffmpeg",
"-i",
"./data/video/v1.mp4",
"-q:a",
"0",
"-map",
"a",
self.tmp_frame + "/audio.mp3",
"-y",
],
stdout=open(os.devnull, "w"),
stderr=STDOUT,
)
"""
video = ffmpeg.input("./data/video/v1.mp4")
out = ffmpeg.output(
video,
self.tmp_frame + "/audio.mp3",
**{"qscale:a": 0},
**{"map": "a"},
).overwrite_output()
out.run()
self._encode_frame(self.tmp_frame, payload, self.caesarn)
# ffmpeg -i temp/%d.png -vcodec png data/enc-filename.mov
"""
call(
[
"ffmpeg",
"-i",
self.tmp_frame + "/%d.png",
"-vcodec",
"png",
self.tmp_frame + "/video.mov",
"-y",
],
stdout=open(os.devnull, "w"),
stderr=STDOUT,
)
"""
video = ffmpeg.input(self.tmp_frame + "/%d.png")
out = ffmpeg.output(
video.video, self.tmp_frame + "/video.mov", acodec="copy", vcodec="png"
).overwrite_output()
out.run()
# ffmpeg -i temp/temp-video.avi -i temp/audio.mp3
# -codec copy data/enc-chef.mp4 -y
"""
call(
[
"ffmpeg",
"-i",
self.tmp_frame + "/video.mov",
"-i",
self.tmp_frame + "/audio.mp3",
"-codec",
"copy",
str(output),
"-y",
],
stdout=open(os.devnull, "w"),
stderr=STDOUT,
)
"""
video = ffmpeg.input(self.tmp_frame + "/video.mov")
audio = ffmpeg.input(self.tmp_frame + "/audio.mp3")
out = ffmpeg.output(
audio.audio, video.video, str(output), codec="copy"
).overwrite_output()
out.run()
print("(!) Success , output : " + str(output))
def decode_write_txt(self, carrier: cv2.VideoCapture, output_txt: str):
"""Decode the secret from the encoded video.
Args:
carrier: The encoded video mp4. Read with `stegobox.io.video.read().
output_txt: Method will write the secret message to this path.
"""
self._frame_extract(carrier)
self._decode_frame(self.tmp_frame, self.caesarn, output_txt)