Skip to content

SUniward

stegobox.codec.SUniward

Bases: BaseCodec

SUniward is a steganography method for Image.

Source code derived from: S-UNIWARD-python

Only encode function is available.

Source code in stegobox/codec/s_uniward.py
class SUniward(BaseCodec):
    """
    SUniward is a steganography method for Image.

    Source code derived from:
    [S-UNIWARD-python](https://github.com/TracyCuiq/S-UNIWARD-python)

    Only encode function is available.
    """

    def __init__(self) -> None:
        super().__init__()
        np.set_printoptions(threshold=sys.maxsize)

    def encode(self, carrier: Image.Image, payload: float) -> Image.Image:
        """Merge secret_img into host image

        Args:
            carrier: host image
            payload: payload in bits per pixel

        Returns:
            Image: stego image.
        """

        cover = np.array(carrier)
        stego = self.s_uniward(cover, payload)

        return stego

    def decode(self, carrier: Image.Image) -> None:
        # TODO: decode
        raise NotImplementedError("decode is not implemented")

    def s_uniward(self, cover: np.ndarray, payload: float) -> Image.Image:
        sgm = 1
        # Get 2D wavelet filters - Daubechies 8
        # 1D high pass decomposition filter

        hpdf_list = [
            -0.0544158422,
            0.3128715909,
            -0.6756307363,
            0.5853546837,
            0.0158291053,
            -0.2840155430,
            -0.0004724846,
            0.1287474266,
            0.0173693010,
            -0.0440882539,
            -0.0139810279,
            0.0087460940,
            0.0048703530,
            -0.0003917404,
            -0.0006754494,
            -0.0001174768,
        ]

        # 1D low pass decomposition filter
        hpdf_len = range(0, len(hpdf_list))
        hpdf_list_reverse = hpdf_list[::-1]
        lpdf_list = hpdf_list
        for i in range(len(hpdf_list)):
            lpdf_list[i] = ((-1) ** hpdf_len[i]) * hpdf_list_reverse[i]
        hpdf_array = np.array([hpdf_list])
        lpdf_array = np.array([lpdf_list])
        lpdf = lpdf_array.reshape(len(lpdf_list), 1)
        hpdf = hpdf_array.reshape(len(hpdf_list), 1)
        # construction of 2D wavelet filters
        f1 = lpdf * hpdf_array
        f2 = hpdf * lpdf_array
        f3 = hpdf * hpdf_array

        w_f = np.zeros((f1.shape[0], f1.shape[0], 3))
        w_f[:, :, 0] = f1
        w_f[:, :, 1] = f2
        w_f[:, :, 2] = f3

        # Get embedding costs
        # initialization

        wetcost = 100000000
        cover_k, cover_l, _ = cover.shape

        # add padding
        s1, _1 = f1.shape
        s2, _2 = f2.shape
        s3, _3 = f3.shape

        pad_size = max(s1, s2, s3)
        cover_padded = np.zeros((cover_k + pad_size * 2, cover_l + pad_size * 2, 3))
        for i in range(3):
            cover_padded[:, :, i] = np.lib.pad(cover[:, :, i], pad_size, "symmetric")
        xi = np.zeros((cover_k + pad_size * 2, cover_l + pad_size * 2, 3))
        x = np.zeros((cover_k, cover_l, 3))
        for i in range(3):
            # compute residual
            r = convolve2d(cover_padded[:, :, i], w_f[:, :, i], mode="same")
            xi[:, :, i] = convolve2d(
                1.0 / (np.abs(r) + sgm), np.rot90(abs(w_f[:, :, i]), 2), mode="same"
            )
            # correct the suitability shift if filter size is even
            if s1 % 2 == 0:
                xi[:, :, i] = np.roll(xi[:, :, i], [1, 0])
                xi[:, :, i] = np.roll(xi[:, :, i], [0, 1])
            # remove padding
            s_xi, __xi = xi[:, :, i].shape
            x[:, :, i] = xi[
                int((s_xi - cover_k) / 2) : int(-(s_xi - cover_k) / 2),
                int((__xi - cover_l) / 2) : int(-(__xi - cover_l) / 2),
                i,
            ]

        # compute embedding costs \rho
        # rho = np.zeros((k, l))
        rho = x[:, :, 0] + x[:, :, 1] + x[:, :, 2]

        # adjust embedding costs
        a, b = np.where(rho > wetcost)
        for i in range(len(a)):
            rho[a[i], b[i]] = wetcost  # threshold on the costs
        a, b = np.where(np.isnan(rho))
        for i in range(len(a)):
            rho[a[i], b[i]] = wetcost  # if all xi{} are zero threshold the cost

        # k, k_ = rho.shape
        rhop1 = np.zeros((cover_k, cover_l, 3))
        rhom1 = np.zeros((cover_k, cover_l, 3))

        for i in range(3):
            rhop1[:, :, i] = rho
            rhom1[:, :, i] = rho
        # a, b, c = np.where(cover - 255.0 <= 0.1)
        a, b, c = np.where(cover == 255)

        for i in range(len(a)):
            rhop1[
                a[i], b[i], c[i]
            ] = wetcost  # do not embed +1 if the pixel has max value

        # a, b, c = np.where(cover - 0 <= 0.1)
        a, b, c = np.where(cover == 0)
        for i in range(len(a)):
            rhom1[
                a[i], b[i], c[i]
            ] = wetcost  # do not embed -1 if the pixel has min value

        # Embedding simulator ##
        cover_len = len(cover[:, :, 0]) * len(cover[:, :, 0])
        stego = cover

        for i in range(3):
            stego[:, :, i] = self.embedding_simulator_singel(
                cover[:, :, i],
                rhop1[:, :, i],
                rhom1[:, :, i],
                payload * cover_len,
                fix_embedding_changes=False,
            )
        return Image.fromarray(stego)

    def embedding_simulator_singel(
        self,
        x: np.ndarray,
        rhop1: np.ndarray,
        rhom1: np.ndarray,
        m: float,
        fix_embedding_changes: bool = False,
    ) -> np.ndarray:
        w, h = x.shape
        cover_len = w * h
        lam = self.cal_lambda_(rhop1, rhom1, m, cover_len)
        shape = rhop1.shape
        pchange_p1 = [
            (math.exp(-lam * rhop1[i][j]))
            / (1 + math.exp(-lam * rhop1[i][j]) + math.exp(-lam * rhom1[i][j]))
            for j in range(shape[1])
            for i in range(shape[0])
        ]
        pchange_m1 = [
            (math.exp(-lam * rhom1[i][j]))
            / (1 + math.exp(-lam * rhop1[i][j]) + math.exp(-lam * rhom1[i][j]))
            for j in range(shape[1])
            for i in range(shape[0])
        ]
        pchange_p1_array = np.array(pchange_p1).reshape(shape[1], shape[0]).T
        pchange_m1_array = np.array(pchange_m1).reshape(shape[1], shape[0]).T
        if fix_embedding_changes:
            np.random.seed(139187)
        rand_change = np.random.rand(w, h)
        y = x

        arr0, _0 = np.where(rand_change < pchange_p1_array)
        for i in range(len(arr0)):
            y[arr0[i]][_0[i]] += 1

        arr1, _1 = np.where(
            (rand_change >= pchange_p1_array)
            & (rand_change < pchange_p1_array + pchange_m1_array)
        )
        for i in range(len(arr1)):
            y[arr1[i]][_1[i]] -= 1
        return y

    def cal_lambda_(
        self, rhop1: np.ndarray, rhom1: np.ndarray, message_length: float, n: int
    ) -> float:
        l3 = 1e3
        m3 = float(message_length + 1)
        iterations = 0
        lam = 0.0
        while m3 > message_length:
            pp1 = rhop1
            # shape = lambda x: pp1.shape if pp1.shape == pm1.shape else 0
            shape = pp1.shape
            l3 = l3 * 2
            pp1 = [
                (math.exp(-l3 * rhop1[i][j]))
                / (1 + math.exp(-l3 * rhop1[i][j]) + math.exp(-l3 * rhom1[i][j]))
                for j in range(shape[1])
                for i in range(shape[0])
            ]  # list
            pm1 = [
                (math.exp(-l3 * rhom1[i][j]))
                / (1 + math.exp(-l3 * rhop1[i][j]) + math.exp(-l3 * rhom1[i][j]))
                for j in range(shape[1])
                for i in range(shape[0])
            ]  # list

            m3 = self.ternary_entropyf_4list(pp1, pm1)
            iterations = iterations + 1
            if iterations > 10:
                return l3
            l1 = 0.0
            m1 = float(n)
            lam = 0.0
            alpha = message_length / n

            # limit search to 30 iterations
            while (m1 - m3) / n > alpha / 1000.0 and iterations < 30:
                lam = l1 + (l3 - l1) / 2
                pp1 = [
                    (math.exp(-lam * rhop1[i][j]))
                    / (1 + math.exp(-lam * rhop1[i][j]) + math.exp(-lam * rhom1[i][j]))
                    for j in range(shape[1])
                    for i in range(shape[0])
                ]
                pm1 = [
                    (math.exp(-lam * rhom1[i][j]))
                    / (1 + math.exp(-lam * rhop1[i][j]) + math.exp(-lam * rhom1[i][j]))
                    for j in range(shape[1])
                    for i in range(shape[0])
                ]
                m2 = self.ternary_entropyf_4list(pp1, pm1)
                if m2 < message_length:
                    l3 = lam
                    m3 = m2
                else:
                    l1 = lam
                    m1 = m2

                iterations = iterations + 1
        return lam

    def ternary_entropyf_4list(self, pp1_: list, pm1_: list) -> float:
        p0 = [1 - pp1_[i] - pm1_[i] for i in range(len(pp1_))]
        p = p0 + pp1_ + pm1_
        ht = 0.0

        for i in range(len(p)):
            if p[i] != 0:
                h = -(p[i] * math.log(p[i]))
                ht += h

        return ht

encode(carrier, payload)

Merge secret_img into host image

Parameters:

Name Type Description Default
carrier Image

host image

required
payload float

payload in bits per pixel

required

Returns:

Name Type Description
Image Image

stego image.

Source code in stegobox/codec/s_uniward.py
def encode(self, carrier: Image.Image, payload: float) -> Image.Image:
    """Merge secret_img into host image

    Args:
        carrier: host image
        payload: payload in bits per pixel

    Returns:
        Image: stego image.
    """

    cover = np.array(carrier)
    stego = self.s_uniward(cover, payload)

    return stego