EM算法及代码

一、算法简介。

EM算法全称为Expectation Maximization,即期望极大算法,是一种用于处理含有隐变量(hidden variable)的概率模型参数的极大似然估计,或极大后验概率估计。EM算法是一种迭代算法,每一次迭代可分为两步:E步,求期望(Expectation);M步,求极大(Maximization)。

二、算法步骤。

引用于PRML。

EM算法及代码

三、个人总结。

EM算法是求含有潜变量的模型参数的方法。因为传统的极大似然估计不能解决含有隐变量的模型,所以需要消除隐变量。这里我们采用求期望的方式去消除隐变量,是因为可以将似然函数表达式拆分为期望和KL散度的和。当KL散度为0时,期望与似然函数相等,最大化期望也是最大化似然函数。这类似于变分推断的理念。

附录:李航《统计学习方法》第9章习题第3题实现代码。(该代码的最终收敛结果取决于参数初始值的设置)

#  Statistical Learning Method Chapter 9 Exercise 9.3
#  Question description: Try to estimate 5 parameters of Gaussian Mixture Model with 2 components based on observed data: -67,-48,6,8,14,16,23,24,28,29,41,49,56,60,75

import random
import numpy as np


def calculate_norm(y, mu, sigma):
    return (np.sqrt(2 * np.pi * sigma * sigma) ** -1) * np.e ** (-((y - mu) ** 2) / (2 * sigma * sigma))


def initialize_parameter():
    mu1 = random.randint(-100, 100)
    sigma1 = random.randint(100, 1000)
    mu2 = random.randint(-100, 100)
    sigma2 = random.randint(100, 1000)
    # only two gaussian components, one's weight is weight1,and the other's weight is 1-weight1
    weight1 = random.randint(1, 9)/10
    print(mu1, mu2, sigma1, sigma2, weight1)
    return mu1, sigma1, mu2, sigma2, weight1


def E_step(y, mu1, sigma1, mu2, sigma2, weight1):
    size = len(y)
    gussian_distribution1 = np.zeros(size)
    gussian_distribution2 = np.zeros(size)
    gamma_j1 = np.zeros(size)
    gamma_j2 = np.zeros(size)
    for i in range(size):
        gussian_distribution1[i] = calculate_norm(y[i], mu1, sigma1)
        gussian_distribution2[i] = calculate_norm(y[i], mu2, sigma2)
        gamma_j1[i] = (weight1 * gussian_distribution1[i]) / (
                weight1 * gussian_distribution1[i] + (1 - weight1) * gussian_distribution2[i])
        gamma_j2[i] = ((1 - weight1) * gussian_distribution2[i]) / (
                weight1 * gussian_distribution1[i] + (1 - weight1) * gussian_distribution2[i])
    # print("gussian_distribution1", gussian_distribution1)
    # print("gussian_distribution2", gussian_distribution2)
    # print("gamma_j1", gamma_j1)
    # print("gamma_j2", gamma_j2)
    return gamma_j1, gamma_j2


def M_step(y, gamma_j1, gamma_j2, mu1, mu2):
    size = len(y)
    sum1 = np.zeros(2)
    sum2 = np.zeros(2)
    sum3 = np.zeros(2)
    for i in range(size):
        sum1[0] += gamma_j1[i] * y[i]
        sum2[0] += gamma_j1[i]
        sum3[0] += gamma_j1[i] * (y[i] - mu1) * (y[i] - mu1)
        sum1[1] += gamma_j2[i] * y[i]
        sum2[1] += gamma_j2[i]
        sum3[1] += gamma_j2[i] * (y[i] - mu2) * (y[i] - mu2)
    mu1_new = sum1[0] / sum2[0]
    sigma1_new = np.sqrt(sum3[0] / sum2[0])
    mu2_new = sum1[1] / sum2[1]
    sigma2_new = np.sqrt(sum3[1] / sum2[1])
    weight1_new = sum2[0] / size
    return mu1_new, sigma1_new, mu2_new, sigma2_new, weight1_new


if __name__ == '__main__':
    y = [-67, -48, 6, 8, 14, 16, 23, 24, 28, 29, 41, 49, 56, 60, 75]
    # if parameters are random initialized, the result will be NaN
    mu1, sigma1, mu2, sigma2, weight1 = initialize_parameter()
    # mu1, sigma1, mu2, sigma2, weight1 = [30, 500, -30, 500, 0.5]
    epoch = 100
    for _ in range(epoch):
        gamma_j1, gamma_j2 = E_step(y, mu1, sigma1, mu2, sigma2, weight1)
        mu1, mu2, sigma1, sigma2, weight1 = M_step(y, gamma_j1, gamma_j2, mu1, mu2)
        print(mu1, mu2, sigma1, sigma2, weight1)