Skip to content

TxtinWav

stegobox.codec.TxtinWav

Bases: BaseCodec

This algorithm uses the LSB algorithm to write the content of the txt text document to the wav audio.

  • Created by: QiuYu
  • Created time: 2022/10/07

Originally implemented in pavanchhatpar/wav-steg-py

Source code in stegobox/codec/txt_in_wav.py
class TxtinWav(BaseCodec):
    """This algorithm uses the LSB algorithm to write the content of the txt text
    document to the wav audio.

    * Created by: QiuYu
    * Created time: 2022/10/07

    Originally implemented in
    [pavanchhatpar/wav-steg-py](https://github.com/pavanchhatpar/wav-steg-py)
    """

    def __init__(self) -> None:
        super().__init__()

    def _prepare(self, soundfile, num_lsb):
        self.sound = soundfile
        self.params = self.sound.getparams()
        num_channels = self.sound.getnchannels()
        sample_width = self.sound.getsampwidth()
        self.n_frames = self.sound.getnframes()
        self.n_samples = self.n_frames * num_channels

        if sample_width == 1:  # samples are unsigned 8-bit integers
            self.fmt = "{}B".format(self.n_samples)
            # Used to set the least significant num_lsb bits of an integer to zero
            self.mask = (1 << 8) - (1 << num_lsb)
            # The least possible value for a sample in the sound file is actually
            # zero, but we don't skip any samples for 8 bit depth wav files.
            self.smallest_byte = -(1 << 8)
        elif sample_width == 2:  # samples are signed 16-bit integers
            self.fmt = "{}h".format(self.n_samples)
            # Used to set the least significant num_lsb bits of an integer to zero
            self.mask = (1 << 15) - (1 << num_lsb)
            # The least possible value for a sample in the sound file
            self.smallest_byte = -(1 << 15)
        else:
            # Python's wave module doesn't support higher sample widths
            raise ValueError("File has an unsupported bit-depth")

    def encode(
        self, carrier: wave.Wave_read, payload: bytes
    ) -> tuple[bytes, wave._wave_params, int]:
        """Encoder requires carrier audio to be WAV and payload to be a txt document.

        Args:
            carrier: Carrier audio in format WAV. Read with `stegobox.io.audio.read()`.
            payload: Payload (secret message) to be encoded. Payload in format TXT.
                Read with `stegobox.io.txt.read_bytes()`

        Returns:
            bytes: Frame information in bytes of the audio with the payload embedded.
            wave._wave_params: Parameters for audio with the payload embedded.
        """
        num_lsb = 3

        self._prepare(carrier, num_lsb)
        # We can hide up to num_lsb bits in each sample of the sound file
        max_bytes_to_hide = (self.n_samples * num_lsb) // 8
        self.filesize = len(payload)

        if self.filesize > max_bytes_to_hide:
            required_lsbs = math.ceil(self.filesize * 8 / self.n_samples)
            raise ValueError(
                "Input file too large to hide, "
                "requires {} LSBs, using {}".format(required_lsbs, num_lsb)
            )

            # print("Using {} B out of {} B".format(filesize, max_bytes_to_hide))

        # Put all the samples from the sound file into a list
        raw_data = list(struct.unpack(self.fmt, self.sound.readframes(self.n_frames)))
        self.sound.close()

        input_data = memoryview(payload)

        # The number of bits we've processed from the input file
        data_index = 0
        sound_index = 0

        # values will hold the altered sound data
        values = []
        buffer = 0
        buffer_length = 0
        done = False

        while not done:
            while buffer_length < num_lsb and data_index // 8 < len(input_data):
                # If we don't have enough data in the buffer, add the
                # rest of the next byte from the file to it.
                buffer += (
                    input_data[data_index // 8] >> (data_index % 8)
                ) << buffer_length
                bits_added = 8 - (data_index % 8)
                buffer_length += bits_added
                data_index += bits_added

            # Retrieve the next num_lsb bits from the buffer for use later
            current_data = buffer % (1 << num_lsb)
            buffer >>= num_lsb
            buffer_length -= num_lsb

            while (
                sound_index < len(raw_data)
                and raw_data[sound_index] == self.smallest_byte
            ):
                # If the next sample from the sound file is the smallest possible
                # value, we skip it. Changing the LSB of such a value could cause
                # an overflow and drastically change the sample in the output.
                values.append(struct.pack(self.fmt[-1], raw_data[sound_index]))
                sound_index += 1

            if sound_index < len(raw_data):
                current_sample = raw_data[sound_index]
                sound_index += 1

                sign = 1
                if current_sample < 0:
                    # We alter the LSBs of the absolute value of the sample to
                    # avoid problems with two's complement. This also avoids
                    # changing a sample to the smallest possible value, which we
                    # would skip when attempting to recover data.
                    current_sample = -current_sample
                    sign = -1

                # Bitwise AND with mask turns the num_lsb least significant bits
                # of current_sample to zero. Bitwise OR with current_data replaces
                # these least significant bits with the next num_lsb bits of data.
                altered_sample = sign * ((current_sample & self.mask) | current_data)

                values.append(struct.pack(self.fmt[-1], altered_sample))

            if data_index // 8 >= len(input_data) and buffer_length <= 0:
                done = True

        while sound_index < len(raw_data):
            # At this point, there's no more data to hide. So we append the rest of
            # the samples from the original sound file.
            values.append(struct.pack(self.fmt[-1], raw_data[sound_index]))
            sound_index += 1

        # io.audio.write(b"".join(values), params, output_path)
        return bytes(b"".join(values)), self.params, self.filesize

    def decode(self, _):
        raise NotImplementedError("This codec does not support decoding without length")

    def decode_with_length(self, carrier: wave.Wave_read, length: int) -> bytes:
        """Decode the secret payload from the carrier audio

        Args:
            carrier: Carrier audio in format WAV. Read with `stegobox.io.audio.read()`.
            payload_len: The length of secret message

        Returns:
            The decoded payload (secret message).
        """
        num_lsb = 3
        bytes_to_recover = length
        # Recover data from the file at sound_path to the file at output_path
        global sound, n_frames, n_samples, fmt, smallest_byte
        self._prepare(carrier, num_lsb)

        # Put all the samples from the sound file into a list
        raw_data = list(struct.unpack(self.fmt, self.sound.readframes(self.n_frames)))
        # Used to extract the least significant num_lsb bits of an integer
        mask = (1 << num_lsb) - 1

        data = bytearray()
        sound_index = 0
        buffer = 0
        buffer_length = 0
        self.sound.close()

        while bytes_to_recover > 0:

            next_sample = raw_data[sound_index]
            if next_sample != self.smallest_byte:
                # Since we skipped samples with the minimum possible value when
                # hiding data, we do the same here.
                buffer += (abs(next_sample) & mask) << buffer_length
                buffer_length += num_lsb
            sound_index += 1

            while buffer_length >= 8 and bytes_to_recover > 0:
                # If we have more than a byte in the buffer, add it to data
                # and decrement the number of bytes left to recover.
                current_data = buffer % (1 << 8)
                buffer >>= 8
                buffer_length -= 8
                data += struct.pack("1B", current_data)
                bytes_to_recover -= 1

        return bytes(data)

encode(carrier, payload)

Encoder requires carrier audio to be WAV and payload to be a txt document.

Parameters:

Name Type Description Default
carrier Wave_read

Carrier audio in format WAV. Read with stegobox.io.audio.read().

required
payload bytes

Payload (secret message) to be encoded. Payload in format TXT. Read with stegobox.io.txt.read_bytes()

required

Returns:

Name Type Description
bytes bytes

Frame information in bytes of the audio with the payload embedded.

_wave_params

wave._wave_params: Parameters for audio with the payload embedded.

Source code in stegobox/codec/txt_in_wav.py
def encode(
    self, carrier: wave.Wave_read, payload: bytes
) -> tuple[bytes, wave._wave_params, int]:
    """Encoder requires carrier audio to be WAV and payload to be a txt document.

    Args:
        carrier: Carrier audio in format WAV. Read with `stegobox.io.audio.read()`.
        payload: Payload (secret message) to be encoded. Payload in format TXT.
            Read with `stegobox.io.txt.read_bytes()`

    Returns:
        bytes: Frame information in bytes of the audio with the payload embedded.
        wave._wave_params: Parameters for audio with the payload embedded.
    """
    num_lsb = 3

    self._prepare(carrier, num_lsb)
    # We can hide up to num_lsb bits in each sample of the sound file
    max_bytes_to_hide = (self.n_samples * num_lsb) // 8
    self.filesize = len(payload)

    if self.filesize > max_bytes_to_hide:
        required_lsbs = math.ceil(self.filesize * 8 / self.n_samples)
        raise ValueError(
            "Input file too large to hide, "
            "requires {} LSBs, using {}".format(required_lsbs, num_lsb)
        )

        # print("Using {} B out of {} B".format(filesize, max_bytes_to_hide))

    # Put all the samples from the sound file into a list
    raw_data = list(struct.unpack(self.fmt, self.sound.readframes(self.n_frames)))
    self.sound.close()

    input_data = memoryview(payload)

    # The number of bits we've processed from the input file
    data_index = 0
    sound_index = 0

    # values will hold the altered sound data
    values = []
    buffer = 0
    buffer_length = 0
    done = False

    while not done:
        while buffer_length < num_lsb and data_index // 8 < len(input_data):
            # If we don't have enough data in the buffer, add the
            # rest of the next byte from the file to it.
            buffer += (
                input_data[data_index // 8] >> (data_index % 8)
            ) << buffer_length
            bits_added = 8 - (data_index % 8)
            buffer_length += bits_added
            data_index += bits_added

        # Retrieve the next num_lsb bits from the buffer for use later
        current_data = buffer % (1 << num_lsb)
        buffer >>= num_lsb
        buffer_length -= num_lsb

        while (
            sound_index < len(raw_data)
            and raw_data[sound_index] == self.smallest_byte
        ):
            # If the next sample from the sound file is the smallest possible
            # value, we skip it. Changing the LSB of such a value could cause
            # an overflow and drastically change the sample in the output.
            values.append(struct.pack(self.fmt[-1], raw_data[sound_index]))
            sound_index += 1

        if sound_index < len(raw_data):
            current_sample = raw_data[sound_index]
            sound_index += 1

            sign = 1
            if current_sample < 0:
                # We alter the LSBs of the absolute value of the sample to
                # avoid problems with two's complement. This also avoids
                # changing a sample to the smallest possible value, which we
                # would skip when attempting to recover data.
                current_sample = -current_sample
                sign = -1

            # Bitwise AND with mask turns the num_lsb least significant bits
            # of current_sample to zero. Bitwise OR with current_data replaces
            # these least significant bits with the next num_lsb bits of data.
            altered_sample = sign * ((current_sample & self.mask) | current_data)

            values.append(struct.pack(self.fmt[-1], altered_sample))

        if data_index // 8 >= len(input_data) and buffer_length <= 0:
            done = True

    while sound_index < len(raw_data):
        # At this point, there's no more data to hide. So we append the rest of
        # the samples from the original sound file.
        values.append(struct.pack(self.fmt[-1], raw_data[sound_index]))
        sound_index += 1

    # io.audio.write(b"".join(values), params, output_path)
    return bytes(b"".join(values)), self.params, self.filesize

decode_with_length(carrier, length)

Decode the secret payload from the carrier audio

Parameters:

Name Type Description Default
carrier Wave_read

Carrier audio in format WAV. Read with stegobox.io.audio.read().

required
payload_len

The length of secret message

required

Returns:

Type Description
bytes

The decoded payload (secret message).

Source code in stegobox/codec/txt_in_wav.py
def decode_with_length(self, carrier: wave.Wave_read, length: int) -> bytes:
    """Decode the secret payload from the carrier audio

    Args:
        carrier: Carrier audio in format WAV. Read with `stegobox.io.audio.read()`.
        payload_len: The length of secret message

    Returns:
        The decoded payload (secret message).
    """
    num_lsb = 3
    bytes_to_recover = length
    # Recover data from the file at sound_path to the file at output_path
    global sound, n_frames, n_samples, fmt, smallest_byte
    self._prepare(carrier, num_lsb)

    # Put all the samples from the sound file into a list
    raw_data = list(struct.unpack(self.fmt, self.sound.readframes(self.n_frames)))
    # Used to extract the least significant num_lsb bits of an integer
    mask = (1 << num_lsb) - 1

    data = bytearray()
    sound_index = 0
    buffer = 0
    buffer_length = 0
    self.sound.close()

    while bytes_to_recover > 0:

        next_sample = raw_data[sound_index]
        if next_sample != self.smallest_byte:
            # Since we skipped samples with the minimum possible value when
            # hiding data, we do the same here.
            buffer += (abs(next_sample) & mask) << buffer_length
            buffer_length += num_lsb
        sound_index += 1

        while buffer_length >= 8 and bytes_to_recover > 0:
            # If we have more than a byte in the buffer, add it to data
            # and decrement the number of bytes left to recover.
            current_data = buffer % (1 << 8)
            buffer >>= 8
            buffer_length -= 8
            data += struct.pack("1B", current_data)
            bytes_to_recover -= 1

    return bytes(data)