Skip to content

AMVEM

stegobox.codec.AMVEM

Bases: BaseCodec

This module implements AMVEM steganography of txt in avi videos.

with code shamelessly taken from razramon/video-Steganography

Source code in stegobox/codec/amvem_txtinavi.py
class AMVEM(BaseCodec):
    """This module implements AMVEM steganography of txt in avi videos.

    with code shamelessly taken from
    [razramon/video-Steganography](https://github.com/razramon/video-Steganography)
    """

    def __init__(self) -> None:
        super().__init__()

    def encode(
        self, carrier: cv2.VideoCapture, payload: str
    ) -> tuple[list[numpy.ndarray], int, tuple[int, int]]:
        """Encoder requires the carrier video to be a videocapture
            and the payload to be a string.

        Args:
            carrier: Carrier video in format AVI. Read with `stegobox.io.video.read()`.
            payload: Payload (secret message) to be encoded.

        Returns:
            A list of encrypted collection of images.
            Video framerate.
            Video size.
        """

        # We convert the resolutions from float to integer.
        frame_width = int(carrier.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(carrier.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(carrier.get(cv2.CAP_PROP_FPS))

        # Read until video is completed
        i = 0
        lens = 0
        img_array = []
        first_frames = True
        frame = None
        while carrier.isOpened():
            # Not including first 2 frames
            if first_frames:
                for j in range(2):
                    ret, frame = carrier.read()
                    if ret:
                        img_array.append(copy.deepcopy(frame))
                    else:
                        break
                first_frames = False
            if i % 2 == 0:
                ret, frame = carrier.read()
            else:
                ret, temp = carrier.read()
                if ret:
                    if lens < len(payload):
                        char = payload[lens]
                        char = "".join(map(bin, bytearray(char, encoding="utf-8")))[2:]
                        char = (-len(char) % 8) * "0" + char
                        frame = self.encode_frame(
                            frame, char, frame_width, frame_height  # type: ignore
                        )
                        lens += 1
            if ret:
                # Write the frame into the file
                img_array.append(copy.deepcopy(frame))
            else:
                # Break the loop
                break
            i += 1
        # When everything done, release the video capture object
        carrier.release()

        return img_array, fps, (frame_width, frame_height)

    def decode(self, carrier: cv2.VideoCapture) -> str:
        """Decode the secret payload from the carrier video

        Args:
            carrier: Carrier video in format AVI. Read with `stegobox.io.video.read()`.

        Returns:
            str: The decoded payload (secret message).
        """

        depayload = ""
        end_of_text = False
        first_frames = True
        width = int(carrier.get(3) / 8)
        height = int(carrier.get(4) / 2)
        while carrier.isOpened():
            # Not including first 2 frames
            if first_frames:
                for j in range(2):
                    ret, original_frame = carrier.read()
                    if not ret:
                        break
                first_frames = False

            # Capture frame-by-frame
            ret, original_frame = carrier.read()
            if not ret:
                # Break the loop
                break
            ret, edit_frame = carrier.read()
            letter = ""
            if ret:
                # Each frame width is encoded to a byte -> 1/8 is bit
                # and so we need to extract the bit.
                # If the noise was on the top half, it is 0, otherwise 1.
                for i in range(8):
                    diff_top = cv2.subtract(
                        original_frame[0:height, int(i * width) : int((i + 1) * width)],
                        edit_frame[0:height, int(i * width) : int((i + 1) * width)],
                    )
                    b_top, g_top, r_top = cv2.split(diff_top)
                    diff_bot = cv2.subtract(
                        original_frame[
                            height : int(height * 2),
                            int(i * width) : int((i + 1) * width),
                        ],
                        edit_frame[
                            height : int(height * 2),
                            int(i * width) : int((i + 1) * width),
                        ],
                    )
                    b_bot, g_bot, r_bot = cv2.split(diff_bot)
                    b = cv2.countNonZero(b_top) - cv2.countNonZero(b_bot)
                    g = cv2.countNonZero(g_top) - cv2.countNonZero(g_bot)
                    r = cv2.countNonZero(r_top) - cv2.countNonZero(r_bot)
                    if (b + g + r) > 0:
                        letter = letter + str(0)
                    elif (b + g + r) < 0:
                        letter = letter + str(1)
                    else:
                        # Haven't found a single bit, we stop iterate to save time.
                        end_of_text = True
                        break
                else:
                    depayload += chr(int(letter, 2))

            else:
                break
            if end_of_text:
                break
        # When everything done, release the video capture object
        carrier.release()

        return depayload

    def encode_frame(
        self, frame: numpy.ndarray, byte_letter: str, width: int, height: int
    ) -> numpy.ndarray:
        # In order to control how much of the frame will be noised
        noise = 0.1
        offset_width = 0
        # Iterating over the byte -
        #   width/8      = bit length
        #   height*bit/2 = which side of the frame we will put noise
        for bit in byte_letter:
            for j in range(
                int(offset_width * width / 8), int((offset_width + 1) * width / 8)
            ):
                for i in range(
                    int(int(bit) * height / 2), int((int(bit) + 1) * height / 2)
                ):
                    # Only 0.1% of each section will be noised
                    # so the frame will not be seen as damaged.
                    if random.random() < noise:
                        color = frame[i, j]
                        red = random.randint(1, 5)
                        green = random.randint(1, 5)
                        blue = random.randint(1, 5)
                        frame[i, j] = (
                            color[0] + red,
                            color[1] + green,
                            color[2] + blue,
                        )
            offset_width += 1
        return frame

encode(carrier, payload)

Encoder requires the carrier video to be a videocapture and the payload to be a string.

Parameters:

Name Type Description Default
carrier VideoCapture

Carrier video in format AVI. Read with stegobox.io.video.read().

required
payload str

Payload (secret message) to be encoded.

required

Returns:

Type Description
list[ndarray]

A list of encrypted collection of images.

int

Video framerate.

tuple[int, int]

Video size.

Source code in stegobox/codec/amvem_txtinavi.py
def encode(
    self, carrier: cv2.VideoCapture, payload: str
) -> tuple[list[numpy.ndarray], int, tuple[int, int]]:
    """Encoder requires the carrier video to be a videocapture
        and the payload to be a string.

    Args:
        carrier: Carrier video in format AVI. Read with `stegobox.io.video.read()`.
        payload: Payload (secret message) to be encoded.

    Returns:
        A list of encrypted collection of images.
        Video framerate.
        Video size.
    """

    # We convert the resolutions from float to integer.
    frame_width = int(carrier.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(carrier.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(carrier.get(cv2.CAP_PROP_FPS))

    # Read until video is completed
    i = 0
    lens = 0
    img_array = []
    first_frames = True
    frame = None
    while carrier.isOpened():
        # Not including first 2 frames
        if first_frames:
            for j in range(2):
                ret, frame = carrier.read()
                if ret:
                    img_array.append(copy.deepcopy(frame))
                else:
                    break
            first_frames = False
        if i % 2 == 0:
            ret, frame = carrier.read()
        else:
            ret, temp = carrier.read()
            if ret:
                if lens < len(payload):
                    char = payload[lens]
                    char = "".join(map(bin, bytearray(char, encoding="utf-8")))[2:]
                    char = (-len(char) % 8) * "0" + char
                    frame = self.encode_frame(
                        frame, char, frame_width, frame_height  # type: ignore
                    )
                    lens += 1
        if ret:
            # Write the frame into the file
            img_array.append(copy.deepcopy(frame))
        else:
            # Break the loop
            break
        i += 1
    # When everything done, release the video capture object
    carrier.release()

    return img_array, fps, (frame_width, frame_height)

decode(carrier)

Decode the secret payload from the carrier video

Parameters:

Name Type Description Default
carrier VideoCapture

Carrier video in format AVI. Read with stegobox.io.video.read().

required

Returns:

Name Type Description
str str

The decoded payload (secret message).

Source code in stegobox/codec/amvem_txtinavi.py
def decode(self, carrier: cv2.VideoCapture) -> str:
    """Decode the secret payload from the carrier video

    Args:
        carrier: Carrier video in format AVI. Read with `stegobox.io.video.read()`.

    Returns:
        str: The decoded payload (secret message).
    """

    depayload = ""
    end_of_text = False
    first_frames = True
    width = int(carrier.get(3) / 8)
    height = int(carrier.get(4) / 2)
    while carrier.isOpened():
        # Not including first 2 frames
        if first_frames:
            for j in range(2):
                ret, original_frame = carrier.read()
                if not ret:
                    break
            first_frames = False

        # Capture frame-by-frame
        ret, original_frame = carrier.read()
        if not ret:
            # Break the loop
            break
        ret, edit_frame = carrier.read()
        letter = ""
        if ret:
            # Each frame width is encoded to a byte -> 1/8 is bit
            # and so we need to extract the bit.
            # If the noise was on the top half, it is 0, otherwise 1.
            for i in range(8):
                diff_top = cv2.subtract(
                    original_frame[0:height, int(i * width) : int((i + 1) * width)],
                    edit_frame[0:height, int(i * width) : int((i + 1) * width)],
                )
                b_top, g_top, r_top = cv2.split(diff_top)
                diff_bot = cv2.subtract(
                    original_frame[
                        height : int(height * 2),
                        int(i * width) : int((i + 1) * width),
                    ],
                    edit_frame[
                        height : int(height * 2),
                        int(i * width) : int((i + 1) * width),
                    ],
                )
                b_bot, g_bot, r_bot = cv2.split(diff_bot)
                b = cv2.countNonZero(b_top) - cv2.countNonZero(b_bot)
                g = cv2.countNonZero(g_top) - cv2.countNonZero(g_bot)
                r = cv2.countNonZero(r_top) - cv2.countNonZero(r_bot)
                if (b + g + r) > 0:
                    letter = letter + str(0)
                elif (b + g + r) < 0:
                    letter = letter + str(1)
                else:
                    # Haven't found a single bit, we stop iterate to save time.
                    end_of_text = True
                    break
            else:
                depayload += chr(int(letter, 2))

        else:
            break
        if end_of_text:
            break
    # When everything done, release the video capture object
    carrier.release()

    return depayload