利用Python对武汉新型冠状肺炎SEIR模型进行数据可视化
在经典的传染病学模型中,R0 值常被用来描述疫情的传染速率,可以反映传染病爆发的潜力和严重程度。序列间隔也常被用来形容疾病的传播速度。
1 SIR模型简介
SIR模型是常见的一种描述传染病传播的数学模型,其基本假设是将人群分为以下三类:
1 易感人群(Susceptible):指未得病者,但缺乏免疫能力,与感病者接触后容易受到感染。
2 感染人群(Infective):指染上传染病的人,他可以传播给易感人群。
3 潜伏人群(Explode):指染上疾病的人,但未被发现。
4 移除人群(Removed):被移出系统的人。因病愈(具有免疫力)或死亡的人。这部分人不再参与感染和被感染过程。
利用Python进行编程分析。利用1月22日至3月12日的数据进行编程分析;
部分代码:
from collections import defaultdict
from datetime import timedelta
from dateutil.relativedelta import relativedelta
from IPython.display import display, Markdown
import math
import os
from pprint import pprint
import warnings
from fbprophet import Prophet
from fbprophet.plot import add_changepoints_to_plot
import pystan.misc # in model.fit(): AttributeError: module 'pystan' has no attribute 'misc'
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import matplotlib
from matplotlib.ticker import ScalarFormatter
%matplotlib inline
import numpy as np
import optuna
optuna.logging.disable_default_handler()
import pandas as pd
import dask.dataframe as dd
pd.plotting.register_matplotlib_converters()
import seaborn as sns
from scipy.integrate import solve_ivp
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
df = population_raw.copy()
df = df.rename({"Province.State": "Province", "Country.Region": "Country"}, axis=1)
cols = ["Country", "Province", "Population"]
df = df.loc[:, cols].fillna("-")
df.loc[df["Country"] == df["Province"], "Province"] = "-"
# Add total records
_total_df = df.loc[df["Province"] != "-", :].groupby("Country").sum()
_total_df = _total_df.reset_index().assign(Province="-")
df = pd.concat([df, _total_df], axis=0, sort=True)
df = df.drop_duplicates(subset=["Country", "Province"], keep="first")
# Global
global_value = df.loc[df["Province"] == "-", "Population"].sum()
df = df.append(pd.Series(["Global", "-", global_value], index=cols), ignore_index=True)
# Sorting
df = df.sort_values("Population", ascending=False).reset_index(drop=True)
df = df.loc[:, cols]
population_df = df.copy()
population_df.head()
df = population_df.loc[population_df["Province"] == "-", :]
population_dict = df.set_index("Country").to_dict()["Population"]
population_dict
def line_plot(df, title, xlabel=None, ylabel="Cases",
h=None, v=None, xlim=(None, None), ylim=(0, None),
math_scale=True, y_logscale=False, y_integer=False,
show_legend=True, bbox_to_anchor=(1.02, 0), bbox_loc="lower left"):
"""
Show chlonological change of the data.
"""
ax = df.plot()
if math_scale:
ax.yaxis.set_major_formatter(ScalarFormatter(useMathText=True))
ax.ticklabel_format(style="sci", axis="y",scilimits=(0, 0))
if y_logscale:
ax.set_yscale("log")
if ylim[0] == 0:
ylim = (None, None)
if y_integer:
fmt = matplotlib.ticker.ScalarFormatter(useOffset=False)
fmt.set_scientific(False)
ax.yaxis.set_major_formatter(fmt)
ax.set_title(title)
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
ax.set_xlim(*xlim)
ax.set_ylim(*ylim)
if show_legend:
ax.legend(bbox_to_anchor=bbox_to_anchor, loc=bbox_loc, borderaxespad=0)
else:
ax.legend().set_visible(False)
if h is not None:
ax.axhline(y=h, color="black", linestyle="--")
if v is not None:
if not isinstance(v, list):
v = [v]
for value in v:
ax.axvline(x=value, color="black", linestyle="--")
plt.tight_layout()
plt.show()
class Estimator(object):
def __init__(self, model, ncov_df, total_population, name=None, places=None, areas=None,
excluded_places=None, start_date=None, end_date=None, date_format="%d%b%Y", **params):
"""
Set training data.
@model <ModelBase>: the model
@name <str>: name of the area
@params: fixed parameter of the model
@the other params: See the function named create_target_df()
"""
# Fixed parameters
self.fixed_param_dict = params.copy()
if None in params.values():
self.fixed_param_dict = {
k: v for (k, v) in params.items() if v is not None
}
# Register the dataset arranged for the model
dataset = model.create_dataset(
ncov_df, total_population, places=places, areas=areas,
excluded_places=excluded_places,
start_date=start_date, end_date=end_date, date_format=date_format
)
self.start_time, self.initials, self.Tend, self.train_df = dataset
self.total_population = total_population
self.name = name
self.model = model
self.param_dict = dict()
self.study = None
self.optimize_df = None
def run(self, n_trials=500):
"""
Try estimation (optimization of parameters and tau).
@n_trials <int>: the number of trials
"""
if self.study is None:
self.study = optuna.create_study(direction="minimize")
self.study.optimize(
lambda x: self.objective(x),
n_trials=n_trials,
n_jobs=-1
)
param_dict = self.study.best_params.copy()
param_dict.update(self.fixed_param_dict)
param_dict["R0"] = self.calc_r0()
param_dict["score"] = self.score()
param_dict.update(self.calc_days_dict())
self.param_dict = param_dict.copy()
return param_dict
def history_df(self):
"""
Return the hsitory of optimization.
@return <pd.DataFrame>
"""
optimize_df = self.study.trials_dataframe()
optimize_df["time[s]"] = optimize_df["datetime_complete"] - \
optimize_df["datetime_start"]
optimize_df["time[s]"] = optimize_df["time[s]"].dt.total_seconds()
self.optimize_df = optimize_df.drop(
["datetime_complete", "datetime_start", "system_attrs__number"], axis=1)
return self.optimize_df.sort_values("value", ascending=True)
def history_graph(self):
"""
Show the history of parameter search using pair-plot.
"""
if self.optimize_df is None:
self.history_df()
df = self.optimize_df.copy()
sns.pairplot(df.loc[:, df.columns.str.startswith(
"params_")], diag_kind="kde", markers="+")
plt.show()
def objective(self, trial):
# Time
try:
tau = self.fixed_param_dict["tau"]
except KeyError:
tau = trial.suggest_int("tau", 1, 1440)
train_df_divided = self.train_df.copy()
train_df_divided["t"] = (train_df_divided["T"] / tau).astype(np.int64)
# Parameters
param_dict = self.model.param_dict(train_df_divided)
p_dict = {"tau": None}
p_dict.update(
{
k: trial.suggest_uniform(k, *v)
for (k, v) in param_dict.items()
}
)
p_dict.update(self.fixed_param_dict)
p_dict.pop("tau")
# Simulation
t_end = train_df_divided.loc[train_df_divided.index[-1], "t"]
sim_df = simulation(self.model, self.initials, step_n=t_end, **p_dict)
return self.error_f(train_df_divided, sim_df)
def error_f(self, train_df_divided, sim_df):
"""
We need to minimize the difference of the observed values and estimated values.
This function calculate the difference of the estimated value and obsereved value.
"""
n = self.total_population
df = pd.merge(train_df_divided, sim_df, on="t", suffixes=("_observed", "_estimated"))
diffs = [
# Weighted Average: the recent data is more important
p * np.average(
abs(df[f"{v}_observed"] - df[f"{v}_estimated"]) / (df[f"{v}_observed"] * n + 1),
weights=df["t"]
)
for (p, v) in zip(self.model.PRIORITIES, self.model.VARIABLES)
]
return sum(diffs) * n
def compare_df(self):
"""
Show the taining data and simulated data in one dataframe.
"""
est_dict = self.study.best_params.copy()
est_dict.update(self.fixed_param_dict)
tau = est_dict["tau"]
est_dict.pop("tau")
observed_df = self.train_df.drop("T", axis=1)
observed_df["t"] = (self.train_df["T"] / tau).astype(int)
t_end = observed_df.loc[observed_df.index[-1], "t"]
sim_df = simulation(self.model, self.initials, step_n=t_end, **est_dict)
df = pd.merge(observed_df, sim_df, on="t", suffixes=("_observed", "_estimated"))
df = df.set_index("t")
return df
def compare_graph(self):
"""
Compare obsereved and estimated values in graphs.
"""
df = self.compare_df()
use_variables = [
v for (i, (p, v)) in enumerate(zip(self.model.PRIORITIES, self.model.VARIABLES))
if p != 0 and i != 0
]
val_len = len(use_variables) + 1
fig, axes = plt.subplots(
ncols=1, nrows=val_len, figsize=(9, 6 * val_len / 2))
for (ax, v) in zip(axes.ravel()[1:], use_variables):
df[[f"{v}_observed", f"{v}_estimated"]].plot.line(
ax=ax, ylim=(0, None), sharex=True,
title=f"{self.model.NAME}: Comparison of observed/estimated {v}(t)"
)
ax.yaxis.set_major_formatter(ScalarFormatter(useMathText=True))
ax.ticklabel_format(style="sci", axis="y", scilimits=(0, 0))
ax.legend(bbox_to_anchor=(1.02, 0),
loc="lower left", borderaxespad=0)
for v in use_variables:
df[f"{v}_diff"] = df[f"{v}_observed"] - df[f"{v}_estimated"]
df[f"{v}_diff"].plot.line(
ax=axes.ravel()[0], sharex=True,
title=f"{self.model.NAME}: observed - estimated"
)
axes.ravel()[0].axhline(y=0, color="black", linestyle="--")
axes.ravel()[0].yaxis.set_major_formatter(
ScalarFormatter(useMathText=True))
axes.ravel()[0].ticklabel_format(
style="sci", axis="y", scilimits=(0, 0))
axes.ravel()[0].legend(bbox_to_anchor=(1.02, 0),
loc="lower left", borderaxespad=0)
fig.tight_layout()
fig.show()
def calc_r0(self):
"""
Calculate R0.
"""
est_dict = self.study.best_params.copy()
est_dict.update(self.fixed_param_dict)
est_dict.pop("tau")
model_instance = self.model(**est_dict)
return model_instance.calc_r0()
def calc_days_dict(self):
"""
Calculate 1/beta etc.
"""
est_dict = self.study.best_params.copy()
est_dict.update(self.fixed_param_dict)
tau = est_dict["tau"]
est_dict.pop("tau")
model_instance = self.model(**est_dict)
return model_instance.calc_days_dict(tau)
def predict_df(self, step_n):
"""
Predict the values in the future.
@step_n <int>: the number of steps
@return <pd.DataFrame>: predicted data for measurable variables.
"""
est_dict = self.study.best_params.copy()
est_dict.update(self.fixed_param_dict)
tau = est_dict["tau"]
est_dict.pop("tau")
df = simulation(self.model, self.initials, step_n=step_n, **est_dict)
df["Time"] = (
df["t"] * tau).apply(lambda x: timedelta(minutes=x)) + self.start_time
df = df.set_index("Time").drop("t", axis=1)
df = (df * self.total_population).astype(np.int64)
upper_cols = [n.upper() for n in df.columns]
df.columns = upper_cols
df = self.model.calc_variables_reverse(df).drop(upper_cols, axis=1)
return df
def predict_graph(self, step_n, name=None, excluded_cols=None):
"""
Predict the values in the future and create a figure.
@step_n <int>: the number of steps
@name <str>: name of the area
@excluded_cols <list[str]>: the excluded columns in the figure
"""
if self.name is not None:
name = self.name
else:
name = str() if name is None else name
df = self.predict_df(step_n=step_n)
if excluded_cols is not None:
df = df.drop(excluded_cols, axis=1)
r0 = self.param_dict["R0"]
title = f"Prediction in {name} with {self.model.NAME} model: R0 = {r0}"
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
line_plot(df, title, v=today, h=self.total_population)
def rmsle(self, compare_df):
"""
Return the value of RMSLE.
@param compare_df <pd.DataFrame>
"""
df = compare_df.set_index("t") * self.total_population
score = 0
for (priority, v) in zip(self.model.PRIORITIES, self.model.VARIABLES):
if priority == 0:
continue
observed, estimated = df[f"{v}_observed"], df[f"{v}_estimated"]
diff = (np.log10(observed + 1) - np.log10(estimated + 1))
score += (diff ** 2).sum()
rmsle = np.sqrt(score / len(df) * 2)
return rmsle
def score(self):
"""
Return the value of RMSLE.
"""
rmsle = self.rmsle(self.compare_df().reset_index("t"))
return rmsle
def info(self):
"""
Return Estimater information.
@return <tupple[object]>:
- <ModelBase>: model
- <dict[str]=str>: name, total_population, start_time, tau
- <dict[str]=float>: values of parameters of model
"""
param_dict = self.study.best_params.copy()
param_dict.update(self.fixed_param_dict)
info_dict = {
"name": self.name,
"total_population": self.total_population,
"start_time": self.start_time,
"tau": param_dict["tau"],
"initials": self.initials
}
param_dict.pop("tau")
return (self.model, info_dict, param_dict)
得到部分图表如下: