Skip to content

DwtDctSvd

stegobox.codec.DwtDctSvd

Bases: BaseCodec

DwtDctSvd - embedding algorithm that creates stealthy steganographic images and decodes them without relying on the original image.

Performance - DwtDctSvd is 3x slower than MaxDct.

Original implementation reports 1500ms-2s encoding time and ~1s decoding time for 1920x1080 images on CPU, not suitable for on-the-fly embedding for large images.

Originally implemented in ShieldMnt/invisible-watermark and is used inside CompVis/stable-diffusion.

Source code in stegobox/codec/invisible_watermark/dwt_dct_svd.py
class DwtDctSvd(BaseCodec):
    """DwtDctSvd - embedding algorithm that creates stealthy steganographic images and
    decodes them without relying on the original image.

    Tip: Performance - DwtDctSvd is 3x slower than MaxDct.
        Original implementation reports 1500ms-2s encoding time and ~1s decoding time
        for 1920x1080 images on CPU, not suitable for on-the-fly embedding for large
        images.

    Originally implemented in
    [ShieldMnt/invisible-watermark](https://github.com/ShieldMnt/invisible-watermark)
    and is used inside
    [CompVis/stable-diffusion](https://github.com/CompVis/stable-diffusion).
    """

    def __init__(self, scales: list[int] = [0, 36, 0], block: int = 4) -> None:
        """Frequency embedding method - DwtDctSvd uses DWT and DCT transforms, SVD
        decomposition of each block, to embed payload bits into singular value
        decomposition coefficients.

        Args:
            scales: A list of scaling factors, each one registered to apply to a color
                channel of an RGB image. Defaults to [0, 36, 0].
            block: Number of DWT blocks. Defaults to 4.
        """
        super().__init__()
        self.scales = scales
        self.block = block

    def encode(self, carrier: cv2.Mat, payload: str) -> tuple[cv2.Mat, int]:
        """Encodes payload string into an image with DWT + DCT transform and SVD.

        Args:
            carrier: The carrier image, read with `io.image.read_cv2()`.
            payload: The payload string.

        Returns:
            The encoded image, write with `io.image.write_cv2()`.
            Length of payload in bits, you will need this when decoding.
        """
        # bgr = np.array(carrier)
        bgr = carrier
        row, col, _ = bgr.shape
        yuv = cv2.cvtColor(bgr, cv2.COLOR_BGR2YUV)

        # Payload string encoded as bytes
        payload_seq = payload_in_bytes(payload)
        payload_len = len(payload_seq)
        print(f"Payload length: {payload_len} bits")

        for c in range(2):
            if self.scales[c] <= 0:
                continue

            c1, (h1, v1, d1) = pywt.dwt2(yuv[: row // 4 * 4, : col // 4 * 4, c], "haar")
            self._encode_frame(payload_seq, payload_len, c1, self.scales[c])
            yuv[: row // 4 * 4, : col // 4 * 4, c] = pywt.idwt2(
                (c1, (h1, v1, d1)), "haar"
            )

        bgr_encoded = cv2.cvtColor(yuv, cv2.COLOR_YUV2BGR)
        return bgr_encoded, payload_len

    def decode(self, _):
        raise NotImplementedError("This codec does not support decoding without length")

    def decode_with_length(self, carrier: cv2.Mat, payload_len: int) -> str:
        """Try to decode payload from an image with a DWT + DCT transform and SVD.

        Args:
            carrier: The encoded steganographic image, read with `io.image.read_cv2()`.
            payload_len: The length of your payload in bits.

        Returns:
            The extracted payload string if successful.
        """
        # bgr = np.array(carrier)
        bgr = carrier
        row, col, _ = bgr.shape
        yuv = cv2.cvtColor(bgr, cv2.COLOR_BGR2YUV)

        scores: list[list[int]] = [[] for _ in range(payload_len)]

        for c in range(2):
            if self.scales[c] <= 0:
                continue

            c1, (_, _, _) = pywt.dwt2(yuv[: row // 4 * 4, : col // 4 * 4, c], "haar")
            scores = self._decode_frame(payload_len, c1, self.scales[c], scores)

        avg_scores = list(map(lambda x: np.array(x).mean(), scores))
        bits = np.array(avg_scores) * 255 > 127
        return reconstruct_bytes(bits, payload_len)

    def _decode_frame(
        self, wm_len: int, frame: np.ndarray, scale: int, scores: list[list[int]]
    ) -> list[list[int]]:
        (row, col) = frame.shape
        num = 0

        for i in range(row // self.block):
            for j in range(col // self.block):
                block = frame[
                    i * self.block : i * self.block + self.block,
                    j * self.block : j * self.block + self.block,
                ]

                score = self._infer_dct_svd(block, scale)
                wm_bit = num % wm_len
                scores[wm_bit].append(score)
                num = num + 1

        return scores

    def _diffuse_dct_svd(
        self, block: np.ndarray, wm_bit: np.uint8, scale: int
    ) -> np.ndarray:
        u, s, v = np.linalg.svd(cv2.dct(block))

        s[0] = (s[0] // scale + 0.25 + 0.5 * wm_bit) * scale
        return cv2.idct(np.dot(u, np.dot(np.diag(s), v)))

    def _infer_dct_svd(self, block: np.ndarray, scale: int) -> int:
        u, s, v = np.linalg.svd(cv2.dct(block))

        score = 0
        score = int((s[0] % scale) > scale * 0.5)
        return score
        # if score >= 0.5:
        #     return 1.0
        # else:
        #     return 0.0

    def _encode_frame(
        self, wm: list[np.uint8], wm_len: int, frame: np.ndarray, scale: int
    ) -> None:
        """
        frame is a matrix (M, N)

        we get K (watermark bits size) blocks (self.block x self.block)

        For i-th block, we encode watermark[i] bit into it
        """
        (row, col) = frame.shape
        num = 0
        for i in range(row // self.block):
            for j in range(col // self.block):
                block = frame[
                    i * self.block : i * self.block + self.block,
                    j * self.block : j * self.block + self.block,
                ]
                wm_bit = wm[(num % wm_len)]

                diffused_block = self._diffuse_dct_svd(block, wm_bit, scale)
                frame[
                    i * self.block : i * self.block + self.block,
                    j * self.block : j * self.block + self.block,
                ] = diffused_block

                num = num + 1

__init__(scales=[0, 36, 0], block=4)

Frequency embedding method - DwtDctSvd uses DWT and DCT transforms, SVD decomposition of each block, to embed payload bits into singular value decomposition coefficients.

Parameters:

Name Type Description Default
scales list[int]

A list of scaling factors, each one registered to apply to a color channel of an RGB image. Defaults to [0, 36, 0].

[0, 36, 0]
block int

Number of DWT blocks. Defaults to 4.

4
Source code in stegobox/codec/invisible_watermark/dwt_dct_svd.py
def __init__(self, scales: list[int] = [0, 36, 0], block: int = 4) -> None:
    """Frequency embedding method - DwtDctSvd uses DWT and DCT transforms, SVD
    decomposition of each block, to embed payload bits into singular value
    decomposition coefficients.

    Args:
        scales: A list of scaling factors, each one registered to apply to a color
            channel of an RGB image. Defaults to [0, 36, 0].
        block: Number of DWT blocks. Defaults to 4.
    """
    super().__init__()
    self.scales = scales
    self.block = block

encode(carrier, payload)

Encodes payload string into an image with DWT + DCT transform and SVD.

Parameters:

Name Type Description Default
carrier Mat

The carrier image, read with io.image.read_cv2().

required
payload str

The payload string.

required

Returns:

Type Description
Mat

The encoded image, write with io.image.write_cv2().

int

Length of payload in bits, you will need this when decoding.

Source code in stegobox/codec/invisible_watermark/dwt_dct_svd.py
def encode(self, carrier: cv2.Mat, payload: str) -> tuple[cv2.Mat, int]:
    """Encodes payload string into an image with DWT + DCT transform and SVD.

    Args:
        carrier: The carrier image, read with `io.image.read_cv2()`.
        payload: The payload string.

    Returns:
        The encoded image, write with `io.image.write_cv2()`.
        Length of payload in bits, you will need this when decoding.
    """
    # bgr = np.array(carrier)
    bgr = carrier
    row, col, _ = bgr.shape
    yuv = cv2.cvtColor(bgr, cv2.COLOR_BGR2YUV)

    # Payload string encoded as bytes
    payload_seq = payload_in_bytes(payload)
    payload_len = len(payload_seq)
    print(f"Payload length: {payload_len} bits")

    for c in range(2):
        if self.scales[c] <= 0:
            continue

        c1, (h1, v1, d1) = pywt.dwt2(yuv[: row // 4 * 4, : col // 4 * 4, c], "haar")
        self._encode_frame(payload_seq, payload_len, c1, self.scales[c])
        yuv[: row // 4 * 4, : col // 4 * 4, c] = pywt.idwt2(
            (c1, (h1, v1, d1)), "haar"
        )

    bgr_encoded = cv2.cvtColor(yuv, cv2.COLOR_YUV2BGR)
    return bgr_encoded, payload_len

decode_with_length(carrier, payload_len)

Try to decode payload from an image with a DWT + DCT transform and SVD.

Parameters:

Name Type Description Default
carrier Mat

The encoded steganographic image, read with io.image.read_cv2().

required
payload_len int

The length of your payload in bits.

required

Returns:

Type Description
str

The extracted payload string if successful.

Source code in stegobox/codec/invisible_watermark/dwt_dct_svd.py
def decode_with_length(self, carrier: cv2.Mat, payload_len: int) -> str:
    """Try to decode payload from an image with a DWT + DCT transform and SVD.

    Args:
        carrier: The encoded steganographic image, read with `io.image.read_cv2()`.
        payload_len: The length of your payload in bits.

    Returns:
        The extracted payload string if successful.
    """
    # bgr = np.array(carrier)
    bgr = carrier
    row, col, _ = bgr.shape
    yuv = cv2.cvtColor(bgr, cv2.COLOR_BGR2YUV)

    scores: list[list[int]] = [[] for _ in range(payload_len)]

    for c in range(2):
        if self.scales[c] <= 0:
            continue

        c1, (_, _, _) = pywt.dwt2(yuv[: row // 4 * 4, : col // 4 * 4, c], "haar")
        scores = self._decode_frame(payload_len, c1, self.scales[c], scores)

    avg_scores = list(map(lambda x: np.array(x).mean(), scores))
    bits = np.array(avg_scores) * 255 > 127
    return reconstruct_bytes(bits, payload_len)