746 次浏览

权重随机取样–WRS

如题所示,权重随机取样(Weight Random Sampling, WRS),这一应用需求,许多地方都会用到。

首先我们要确定权重随机取样是什么?在我们抽取样本时考虑的是无放回。权重随机取样的每个样本都是带有权重,抽取样本时由样本的权重所决定。下面是权重随机取样算法的一般描述。

权重随机取样算法(algorithm D):
输入数据:\(\)\(n\)\(\)个带有权重的数据样本集\(\)\(V\)\(\)
输出结果:\(\)\(m\)\(\)个WRS算法抽取的数据样本集\(\)\(S\)\(\)
1. 重复\(\)\(2-3\)\(\)步
2. 计算每个样本\(\)\(v_i\)\(\)被抽取的概率:\(\)\(P(v_i) = \frac{w_i}{\sum _{s_j \in V-S} w_j}\)\(\)
3. 随机从\(\)\(V-S\)\(\)抽取一个样本\(\)\(v_i\)\(\),然后放入集合\(\)\(S\)\(\)

对于均匀随机取样(uniform RS),算法的关键在于Alias Method、Partial Sum Trees and the Acceptance、Rejection method(见reference 3),但是对于WRS却不适用。(reference 1\2)的论文则给出了解决WRS的算法。

权重随机取样算法(algorithm A):
输入数据:\(\)\(n\)\(\)个带有权重的数据样本集\(\)\(V\)\(\)
输出结果:\(\)\(m\)\(\)个WRS算法抽取的数据样本集\(\)\(S\)\(\)
1. 对于每一个样本\(\)\(v_i \in V\)\(\)和随机数\(\)\(u_i \in (0, 1)\)\(\),\(\)\(k_i = u_i ^{1 / w_i}\)\(\)
2. 挑选\(\)\(m\)\(\)个最大的\(\)\(k_i\)\(\)作为WRS集合

算法的主要思想来自于,对于任意两个符合均匀分布\(\)\((0, 1)\)\(\)的变量,\(\)\(u_1\)\(\)和\(\)\(u_2\)\(\),如果\(\)\(x_1 = u_1 ^{w_1}\)\(\)和\(\)\(x_2 = u_2 ^{w_2}\)\(\),其中\(\)\(w_1, w_2 > 0\)\(\),则有\(\)\(P(x_1 \leq x_2)=\frac{w_2}{w_1 + w_2}\)\(\)

算法python代码(权重依据数据到已知直线的距离):

import numpy as np
import math
from matplotlib import pyplot as plt


def getWeight(data):
    weights = []
    for item in data:
        weights.append(abs(item[0]-item[1]+15) / math.sqrt(2))
    return np.array(weights)


def wrsA(data, weights):
    """ Weighted random sampling """
    items_len = len(data)
    U = np.random.uniform(0, 1, items_len)  # 均值分布
    keys = U ** (1 / weights)  # 生成keys
    return np.argsort(-keys)[:8]  # 降序排列


if __name__ == '__main__':
    data = np.array([
        [52, 82],
        [56, 79],
        [64, 90],
        [69, 95],
        [58, 96],
        [62, 95],
        [53, 73],
        [72, 91],
        [77, 90],
        [54, 70],
        [50, 74],
        [50, 81],
        [61, 85]])
    weights = getWeight(data)
    idx = None
    all_idx = []
    for i in range(1000):
        idx = wrsA(data, weights)
        all_idx.extend(list(idx))

    pure_all_idx = sorted(list(set(all_idx)))
    result = []
    for i in pure_all_idx:
        result.append([i, all_idx.count(i)])

    pure_all_idx = np.array(pure_all_idx)
    result = np.array(result)

    plt.figure(figsize=(14, 10))
    ax = plt.subplot(211)
    rects = ax.bar(result[:, 0], result[:, 1])
    plt.xticks(result[:, 0], result[:, 0])
    ax.set_title('sampling bar (weight, num of sampling)')
    def autolabel(rects, weights):
        """Attach a text label above each bar in *rects*, displaying its height."""
        for rect, weight in zip(rects, weights):
            height = rect.get_height()
            ax.annotate('({},{})'.format(round(weight, 2), height),
                        xy=(rect.get_x() + rect.get_width() / 2, height),
                        xytext=(0, 3),  # 3 points vertical offset
                        textcoords="offset points",
                        ha='center', va='bottom')

    autolabel(rects, weights)

    ax = plt.subplot(212)
    ax.set_title('WRS')
    plt.scatter(data[:, 0], data[:, 1], c='orange', label='unselected')
    plt.legend()
    x = np.arange(50, 80, 0.01)
    y = x + 15
    plt.plot(x, y)
    plt.annotate(r'$y=x+15$', xy=(65, 78))
    for i in range(len(data)):
        if i in idx:
            plt.scatter(data[i, 0], data[i, 1], c='black')

    plt.show()

下图显示了采样1000次的结果以及最后一次选取的点的结果。

reference 2中证明上面算法的正确性,需要深究的可以查看这篇论文。此外,如果抽样数据大小最初是未知的(数据流形式),可以使用论文中A_res算法以及其改进的算法A_ExpJ。

reference
1. Pavlos S. Efraimidis, et al. Weighted Random Sampling (2005; Efraimidis, Spirakis)
2. Pavlos S. Efraimidis, et al. Weighted random sampling with a reservoir. 2006
3. https://zhuanlan.zhihu.com/p/42630740