Bases: BaseCodec
This module implements simple LSB steganography of txt in mkv videos.
with code shamelessly taken from
SaskaRandenkovic/VideoSteganography
Source code in stegobox/codec/lsb_txt_in_mkv/lsb_txtinmkv.py
| class LSBTXTinMKV(BaseCodec):
"""This module implements simple LSB steganography of txt in mkv videos.
with code shamelessly taken from
[SaskaRandenkovic/VideoSteganography](https://github.com/SaskaRandenkovic/VideoSteganography)
"""
def __init__(self) -> None:
super().__init__()
def encode(
self, carrier: cv2.VideoCapture, payload: str
) -> tuple[list[numpy.ndarray], int, tuple[int, int]]:
"""Encoder requires the carrier video to be videocapture and payload a string.
Args:
carrier: Carrier video in format MKV. Read with `stegobox.io.video.read()`.
payload: Payload (secret message) to be encoded.
Returns:
A list of encrypted collection of images.
Video framerate.
Video size.
"""
message = payload
message = message.replace("\n", "||||||||||||")
cap = carrier
fps = cap.get(cv2.CAP_PROP_FPS)
count = 100000
with TemporaryDirectory() as dir:
while cap.isOpened():
ret, frame = cap.read()
if ret:
cv2.imwrite(f"{dir}/frame{count:d}.png", frame)
count += 1
else:
break
i = 100000
l1 = 0
l2 = 250
if l2 >= len(message):
l2 = len(message)
while True:
msg = message[l1:l2]
imgoc = f"{dir}/frame{i}.png"
secret_enc = lsb_hide(input_image=imgoc, message=msg)
if secret_enc is not None:
secret_enc.save(imgoc)
else:
raise ValueError("Sorry, carrier should not be None image.")
if l2 == len(message):
break
l1 = l2
l2 = l2 + 250
if l2 >= len(message):
l2 = len(message)
i += 1
i += 1
cap.release()
# ignore .DS_Store
files = [f for f in listdir(dir) if not f.startswith(".")]
# sort files in correct order
files.sort(key=lambda x: int(x[5:-4])) # x[5:-4] is the number
img_array = []
size = (0, 0)
for f in files:
filename = f"{dir}/{f}"
img = cv2.imread(filename)
height, width, layers = img.shape
size = (width, height)
img_array.append(img)
return img_array, fps, size
def decode(self, carrier: cv2.VideoCapture) -> str:
"""Decode the secret payload from the carrier video.
Args:
carrier: Carrier video in format MKV. Read with `stegobox.io.video.read()`.
Returns:
str: The decoded payload (secret message).
"""
cap = carrier
count = 100000
with TemporaryDirectory() as dir:
while cap.isOpened():
ret, frame = cap.read()
if ret:
cv2.imwrite(f"{dir}/frame{count}.png", frame)
count += 1
else:
break
i = 100000
secret = []
while i < count:
imgoc = f"{dir}/frame{i}.png"
secret_dec = lsb_reveal(imgoc)
if secret_dec is None:
break
secret.append(secret_dec)
i += 1
msg = "".join(secret)
msg = msg.rstrip()
msg = msg.replace("||||||||||||", "\n")
cap.release()
return msg
|
encode(carrier, payload)
Encoder requires the carrier video to be videocapture and payload a string.
Parameters:
Name |
Type |
Description |
Default |
carrier |
VideoCapture
|
Carrier video in format MKV. Read with stegobox.io.video.read() .
|
required
|
payload |
str
|
Payload (secret message) to be encoded.
|
required
|
Returns:
Type |
Description |
list[ndarray]
|
A list of encrypted collection of images.
|
int
|
|
tuple[int, int]
|
|
Source code in stegobox/codec/lsb_txt_in_mkv/lsb_txtinmkv.py
| def encode(
self, carrier: cv2.VideoCapture, payload: str
) -> tuple[list[numpy.ndarray], int, tuple[int, int]]:
"""Encoder requires the carrier video to be videocapture and payload a string.
Args:
carrier: Carrier video in format MKV. Read with `stegobox.io.video.read()`.
payload: Payload (secret message) to be encoded.
Returns:
A list of encrypted collection of images.
Video framerate.
Video size.
"""
message = payload
message = message.replace("\n", "||||||||||||")
cap = carrier
fps = cap.get(cv2.CAP_PROP_FPS)
count = 100000
with TemporaryDirectory() as dir:
while cap.isOpened():
ret, frame = cap.read()
if ret:
cv2.imwrite(f"{dir}/frame{count:d}.png", frame)
count += 1
else:
break
i = 100000
l1 = 0
l2 = 250
if l2 >= len(message):
l2 = len(message)
while True:
msg = message[l1:l2]
imgoc = f"{dir}/frame{i}.png"
secret_enc = lsb_hide(input_image=imgoc, message=msg)
if secret_enc is not None:
secret_enc.save(imgoc)
else:
raise ValueError("Sorry, carrier should not be None image.")
if l2 == len(message):
break
l1 = l2
l2 = l2 + 250
if l2 >= len(message):
l2 = len(message)
i += 1
i += 1
cap.release()
# ignore .DS_Store
files = [f for f in listdir(dir) if not f.startswith(".")]
# sort files in correct order
files.sort(key=lambda x: int(x[5:-4])) # x[5:-4] is the number
img_array = []
size = (0, 0)
for f in files:
filename = f"{dir}/{f}"
img = cv2.imread(filename)
height, width, layers = img.shape
size = (width, height)
img_array.append(img)
return img_array, fps, size
|
decode(carrier)
Decode the secret payload from the carrier video.
Parameters:
Name |
Type |
Description |
Default |
carrier |
VideoCapture
|
Carrier video in format MKV. Read with stegobox.io.video.read() .
|
required
|
Returns:
Name | Type |
Description |
str |
str
|
The decoded payload (secret message).
|
Source code in stegobox/codec/lsb_txt_in_mkv/lsb_txtinmkv.py
| def decode(self, carrier: cv2.VideoCapture) -> str:
"""Decode the secret payload from the carrier video.
Args:
carrier: Carrier video in format MKV. Read with `stegobox.io.video.read()`.
Returns:
str: The decoded payload (secret message).
"""
cap = carrier
count = 100000
with TemporaryDirectory() as dir:
while cap.isOpened():
ret, frame = cap.read()
if ret:
cv2.imwrite(f"{dir}/frame{count}.png", frame)
count += 1
else:
break
i = 100000
secret = []
while i < count:
imgoc = f"{dir}/frame{i}.png"
secret_dec = lsb_reveal(imgoc)
if secret_dec is None:
break
secret.append(secret_dec)
i += 1
msg = "".join(secret)
msg = msg.rstrip()
msg = msg.replace("||||||||||||", "\n")
cap.release()
return msg
|