Skip to content

DwtDctSvdHideStrinWAV

stegobox.codec.DwtDctSvdHideStrinWAV

DwtDctSvd - embedding algorithm that creates stealthy steganographic audio and decodes them without relying on the original audio.

Progress:First,deal with audiio data. Read audio file in format bytes, and then transform them to bytearray(size:[N]) audio_data. Then process audio as a grayscale image,reshape the audio_data [:(a//block_size * block_size)*(b//block_size * block_size)] to size a//block_size * block_size,b//block_size * block_size dwt_data. Second, get the low-frequency part of the image after DWT transformation. and then use dct and svd to hide secret infomation in low-frequency.

Performance - DwtDctSvd is 3x slower than MaxDct.

Original implementation reports 1500ms-2s encoding time and ~1s decoding time for 1920x1080 images on CPU, not suitable for on-the-fly embedding for large images.

Originally implemented in ShieldMnt/invisible-watermark and is used inside CompVis/stable-diffusion.

Source code in stegobox/codec/dwtdctsvd_hide_str_in_wav.py
class DwtDctSvdHideStrinWAV:
    """DwtDctSvd - embedding algorithm that creates stealthy steganographic audio and
    decodes them without relying on the original audio.

    Progress:First,deal with audiio data. Read audio file in format bytes, and then
    transform them to bytearray(size:[N]) audio_data. Then process audio as a grayscale
    image,reshape the audio_data
    [:(a//block_size * block_size)*(b//block_size * block_size)] to size
    [a//block_size * block_size,b//block_size * block_size](a*b=N) dwt_data.
        Second, get the low-frequency part of the image after DWT transformation.
    and then use dct and svd to hide secret infomation in low-frequency.

    Tip: Performance - DwtDctSvd is 3x slower than MaxDct.
        Original implementation reports 1500ms-2s encoding time and ~1s decoding time
        for 1920x1080 images on CPU, not suitable for on-the-fly embedding for large
        images.

    Originally implemented in
    [ShieldMnt/invisible-watermark](https://github.com/ShieldMnt/invisible-watermark)
    and is used inside
    [CompVis/stable-diffusion](https://github.com/CompVis/stable-diffusion).
    """

    def __init__(self, block: int = 4) -> None:
        """Frequency embedding method - DwtDctSvd uses DWT and DCT transforms, SVD
        decomposition of each block, to embed payload bits into singular value
        decomposition coefficients.

        Args:
            block: Number of DWT blocks. Defaults to 4.
        """
        self.block = block

    def payload_in_bytes(self, payload: str) -> list[np.uint8]:
        """
        transform the string into bytes,example 'A'->65

        Args:
            payload: _description_

        Returns:
            _description_
        """
        payloadb = payload.encode("utf-8")
        payloadsq = np.array([n for n in payloadb], dtype=np.uint8)
        return list(np.unpackbits(payloadsq))

    def reconstruct_bytes(self, bits: np.ndarray, length: int) -> str:
        bits_num = np.packbits(bits)
        bits_string = b""
        for i in range(length // 8):
            bits_string += struct.pack(">B", bits_num[i])
        return bits_string.decode("utf-8")

    def _spartnum(self, num: int) -> tuple[int, int]:
        """
        Break num into a product of two integers.
        example: num=a * b  the input num is 225
        the ouput is (15,15)
        Args:
            num: The integer to be decomposed.

        Returns:
            two integers and the product of them is num.
        """
        mid_num = int(num**0.5)
        a = mid_num
        b = int(num / a)
        if a * b == num:
            return a, b

        for i in range(mid_num, num, 1):
            a = a + 1
            b = int(num / a)
            if a * b == num:
                break
        return a, b

    def encode(
        self, carrier: wave.Wave_read, payload: str
    ) -> tuple[bytes, wave._wave_params, int]:
        """Encodes payload string into an audio with DWT + DCT transform and SVD.

        Args:
            carrier: Carrier audio in format WAV. Read with `stegobox.io.audio.read()`.
            payload: The payload string.

        Returns:
            The encoded wav audio.
            The headparams of audio.
            Length of payload in bits, you will need this when decoding.
        """
        payload_seq = self.payload_in_bytes(payload)
        payload_len = len(payload_seq)
        wave_head = carrier.getparams()
        audio_data = carrier.readframes(carrier.getnframes())
        audio_data = bytearray(audio_data)
        row, col = self._spartnum(len(audio_data))
        row = (row // self.block) * self.block
        col = (col // self.block) * self.block
        zong_length = row * col
        dct_data = audio_data[:zong_length]
        # deal the left data
        left_data = audio_data[zong_length:]

        # make the carrier data to N*4*2 block for dct's input
        dct_data = np.array(dct_data).reshape(row, col)
        dct_data = dct_data.astype(np.float32)

        # Payload string encoded as bytes
        print(f"Payload length: {payload_len} bits")

        c1, (h1, v1, d1) = pywt.dwt2(dct_data, "haar")
        self._encode_frame(payload_seq, payload_len, h1, 18)
        dct_data = pywt.idwt2((c1, (h1, v1, d1)), "haar")
        for i in range(len(dct_data)):
            for j in range(len(dct_data[i])):
                if dct_data[i][j] > 255:
                    dct_data[i][j] = 255

        dct_data = dct_data.reshape(-1).astype(np.uint8)

        outdata = []
        outdata.extend(dct_data)
        outdata.extend(left_data)
        # outdata=bytes(dct_data+left_data)
        outdata = bytes(outdata)
        return outdata, wave_head, payload_len

    def decode(self, _):
        raise NotImplementedError("This codec does not support decoding without length")

    def decode_with_length(self, carrier: wave.Wave_read, payload_len: int) -> str:
        """Try to decode payload from an audio with a DWT + DCT transform and SVD.

        Args:
            carrier: The encoded steganographic audio file.
            payload_len: The length of your payload in bits.

        Returns:
            The extracted payload string if successful.
        """
        audio_data = carrier.readframes(carrier.getnframes())
        audio_data = bytearray(audio_data)
        row, col = self._spartnum(len(audio_data))
        row = (row // self.block) * self.block
        col = (col // self.block) * self.block
        zong_length = row * col
        dct_data = audio_data[:zong_length]
        dct_data = np.array(dct_data).reshape(row, col)
        dct_data = dct_data.astype(np.float32)

        scores: list[list[int]] = [[] for _ in range(payload_len)]

        c1, (h1, _, _) = pywt.dwt2(dct_data, "haar")
        scores = self._decode_frame(payload_len, h1, 18, scores)
        avg_scores = list(map(lambda x: np.array(x).mean(), scores))
        bits = np.array(avg_scores) * 255 > 127
        return self.reconstruct_bytes(bits, payload_len)

    def _decode_frame(
        self, wm_len: int, frame: np.ndarray, scale: int, scores: list[list[int]]
    ) -> list[list[int]]:
        (row, col) = frame.shape
        num = 0

        for i in range(row // self.block):
            for j in range(col // self.block):
                block = frame[
                    i * self.block : i * self.block + self.block,
                    j * self.block : j * self.block + self.block,
                ]

                score = self._infer_dct_svd(block, scale)
                wm_bit = num % wm_len
                scores[wm_bit].append(score)
                num = num + 1

        return scores

    def _diffuse_dct_svd(
        self, block: np.ndarray, wm_bit: np.uint8, scale: int
    ) -> np.ndarray:
        u, s, v = np.linalg.svd(cv2.dct(block))

        s[0] = (s[0] // scale + 0.25 + 0.5 * wm_bit) * scale

        return cv2.idct(np.dot(u, np.dot(np.diag(s), v)))

    def _infer_dct_svd(self, block: np.ndarray, scale: int) -> int:
        u, s, v = np.linalg.svd(cv2.dct(block))

        score = 0
        score = int((s[0] % scale) > scale * 0.5)
        return score

    def _encode_frame(
        self, wm: list[np.uint8], wm_len: int, frame: np.ndarray, scale: int
    ) -> None:
        """
        frame is a matrix (M, N)

        we get K (watermark bits size) blocks (self.block x self.block)

        For i-th block, we encode watermark[i] bit into it
        """
        (row, col) = frame.shape
        num = 0
        for i in range(row // self.block):
            for j in range(col // self.block):
                block = frame[
                    i * self.block : i * self.block + self.block,
                    j * self.block : j * self.block + self.block,
                ]
                wm_bit = wm[(num % wm_len)]

                diffused_block = self._diffuse_dct_svd(block, wm_bit, scale)
                frame[
                    i * self.block : i * self.block + self.block,
                    j * self.block : j * self.block + self.block,
                ] = diffused_block

                num = num + 1

__init__(block=4)

Frequency embedding method - DwtDctSvd uses DWT and DCT transforms, SVD decomposition of each block, to embed payload bits into singular value decomposition coefficients.

Parameters:

Name Type Description Default
block int

Number of DWT blocks. Defaults to 4.

4
Source code in stegobox/codec/dwtdctsvd_hide_str_in_wav.py
def __init__(self, block: int = 4) -> None:
    """Frequency embedding method - DwtDctSvd uses DWT and DCT transforms, SVD
    decomposition of each block, to embed payload bits into singular value
    decomposition coefficients.

    Args:
        block: Number of DWT blocks. Defaults to 4.
    """
    self.block = block

payload_in_bytes(payload)

transform the string into bytes,example 'A'->65

Parameters:

Name Type Description Default
payload str

description

required

Returns:

Type Description
list[uint8]

description

Source code in stegobox/codec/dwtdctsvd_hide_str_in_wav.py
def payload_in_bytes(self, payload: str) -> list[np.uint8]:
    """
    transform the string into bytes,example 'A'->65

    Args:
        payload: _description_

    Returns:
        _description_
    """
    payloadb = payload.encode("utf-8")
    payloadsq = np.array([n for n in payloadb], dtype=np.uint8)
    return list(np.unpackbits(payloadsq))

encode(carrier, payload)

Encodes payload string into an audio with DWT + DCT transform and SVD.

Parameters:

Name Type Description Default
carrier Wave_read

Carrier audio in format WAV. Read with stegobox.io.audio.read().

required
payload str

The payload string.

required

Returns:

Type Description
bytes

The encoded wav audio.

_wave_params

The headparams of audio.

int

Length of payload in bits, you will need this when decoding.

Source code in stegobox/codec/dwtdctsvd_hide_str_in_wav.py
def encode(
    self, carrier: wave.Wave_read, payload: str
) -> tuple[bytes, wave._wave_params, int]:
    """Encodes payload string into an audio with DWT + DCT transform and SVD.

    Args:
        carrier: Carrier audio in format WAV. Read with `stegobox.io.audio.read()`.
        payload: The payload string.

    Returns:
        The encoded wav audio.
        The headparams of audio.
        Length of payload in bits, you will need this when decoding.
    """
    payload_seq = self.payload_in_bytes(payload)
    payload_len = len(payload_seq)
    wave_head = carrier.getparams()
    audio_data = carrier.readframes(carrier.getnframes())
    audio_data = bytearray(audio_data)
    row, col = self._spartnum(len(audio_data))
    row = (row // self.block) * self.block
    col = (col // self.block) * self.block
    zong_length = row * col
    dct_data = audio_data[:zong_length]
    # deal the left data
    left_data = audio_data[zong_length:]

    # make the carrier data to N*4*2 block for dct's input
    dct_data = np.array(dct_data).reshape(row, col)
    dct_data = dct_data.astype(np.float32)

    # Payload string encoded as bytes
    print(f"Payload length: {payload_len} bits")

    c1, (h1, v1, d1) = pywt.dwt2(dct_data, "haar")
    self._encode_frame(payload_seq, payload_len, h1, 18)
    dct_data = pywt.idwt2((c1, (h1, v1, d1)), "haar")
    for i in range(len(dct_data)):
        for j in range(len(dct_data[i])):
            if dct_data[i][j] > 255:
                dct_data[i][j] = 255

    dct_data = dct_data.reshape(-1).astype(np.uint8)

    outdata = []
    outdata.extend(dct_data)
    outdata.extend(left_data)
    # outdata=bytes(dct_data+left_data)
    outdata = bytes(outdata)
    return outdata, wave_head, payload_len

decode_with_length(carrier, payload_len)

Try to decode payload from an audio with a DWT + DCT transform and SVD.

Parameters:

Name Type Description Default
carrier Wave_read

The encoded steganographic audio file.

required
payload_len int

The length of your payload in bits.

required

Returns:

Type Description
str

The extracted payload string if successful.

Source code in stegobox/codec/dwtdctsvd_hide_str_in_wav.py
def decode_with_length(self, carrier: wave.Wave_read, payload_len: int) -> str:
    """Try to decode payload from an audio with a DWT + DCT transform and SVD.

    Args:
        carrier: The encoded steganographic audio file.
        payload_len: The length of your payload in bits.

    Returns:
        The extracted payload string if successful.
    """
    audio_data = carrier.readframes(carrier.getnframes())
    audio_data = bytearray(audio_data)
    row, col = self._spartnum(len(audio_data))
    row = (row // self.block) * self.block
    col = (col // self.block) * self.block
    zong_length = row * col
    dct_data = audio_data[:zong_length]
    dct_data = np.array(dct_data).reshape(row, col)
    dct_data = dct_data.astype(np.float32)

    scores: list[list[int]] = [[] for _ in range(payload_len)]

    c1, (h1, _, _) = pywt.dwt2(dct_data, "haar")
    scores = self._decode_frame(payload_len, h1, 18, scores)
    avg_scores = list(map(lambda x: np.array(x).mean(), scores))
    bits = np.array(avg_scores) * 255 > 127
    return self.reconstruct_bytes(bits, payload_len)