利用Python对武汉新型冠状肺炎SEIR模型进行数据可视化

在经典的传染病学模型中,R0 值常被用来描述疫情的传染速率,可以反映传染病爆发的潜力和严重程度。序列间隔也常被用来形容疾病的传播速度。

1 SIR模型简介

SIR模型是常见的一种描述传染病传播的数学模型,其基本假设是将人群分为以下三类:

1 易感人群(Susceptible):指未得病者,但缺乏免疫能力,与感病者接触后容易受到感染。

2 感染人群(Infective):指染上传染病的人,他可以传播给易感人群。

3 潜伏人群(Explode):指染上疾病的人,但未被发现。

4 移除人群(Removed):被移出系统的人。因病愈(具有免疫力)或死亡的人。这部分人不再参与感染和被感染过程。

利用Python进行编程分析。利用1月22日至3月12日的数据进行编程分析;

部分代码:

from collections import defaultdict

from datetime import timedelta

from dateutil.relativedelta import relativedelta

from IPython.display import display, Markdown

import math

import os

from pprint import pprint

import warnings

from fbprophet import Prophet

from fbprophet.plot import add_changepoints_to_plot

import pystan.misc # in model.fit(): AttributeError: module 'pystan' has no attribute 'misc'

import matplotlib.pyplot as plt

import matplotlib.cm as cm

import matplotlib

from matplotlib.ticker import ScalarFormatter

%matplotlib inline

import numpy as np

import optuna

optuna.logging.disable_default_handler()

import pandas as pd

import dask.dataframe as dd

pd.plotting.register_matplotlib_converters()

import seaborn as sns

from scipy.integrate import solve_ivp

from sklearn.feature_extraction.text import TfidfVectorizer

from sklearn.cluster import KMeans

 

df = population_raw.copy()

df = df.rename({"Province.State""Province""Country.Region""Country"}, axis=1)

cols = ["Country""Province""Population"]

df = df.loc[:, cols].fillna("-")

df.loc[df["Country"] == df["Province"], "Province"] = "-"

# Add total records

_total_df = df.loc[df["Province"] != "-", :].groupby("Country").sum()

_total_df = _total_df.reset_index().assign(Province="-")

df = pd.concat([df, _total_df], axis=0, sort=True)

df = df.drop_duplicates(subset=["Country""Province"], keep="first")

# Global

global_value = df.loc[df["Province"] == "-""Population"].sum()

df = df.append(pd.Series(["Global""-", global_value], index=cols), ignore_index=True)

# Sorting

df = df.sort_values("Population", ascending=False).reset_index(drop=True)

df = df.loc[:, cols]

population_df = df.copy()

population_df.head()

 

df = population_df.loc[population_df["Province"] == "-", :]

population_dict = df.set_index("Country").to_dict()["Population"]

population_dict

 

def line_plot(dftitlexlabel=Noneylabel="Cases",

              h=Nonev=Nonexlim=(NoneNone), ylim=(0None),

              math_scale=Truey_logscale=Falsey_integer=False,

              show_legend=Truebbox_to_anchor=(1.020),  bbox_loc="lower left"):

    """

    Show chlonological change of the data.

    """

    ax = df.plot()

    if math_scale:

        ax.yaxis.set_major_formatter(ScalarFormatter(useMathText=True))

        ax.ticklabel_format(style="sci",  axis="y",scilimits=(00))

    if y_logscale:

        ax.set_yscale("log")

        if ylim[0] == 0:

            ylim = (NoneNone)

    if y_integer:

        fmt = matplotlib.ticker.ScalarFormatter(useOffset=False)

        fmt.set_scientific(False)

        ax.yaxis.set_major_formatter(fmt)

    ax.set_title(title)

    ax.set_xlabel(xlabel)

    ax.set_ylabel(ylabel)

    ax.set_xlim(*xlim)

    ax.set_ylim(*ylim)

    if show_legend:

        ax.legend(bbox_to_anchor=bbox_to_anchor, loc=bbox_loc, borderaxespad=0)

    else:

        ax.legend().set_visible(False)

    if h is not None:

        ax.axhline(y=h, color="black", linestyle="--")

    if v is not None:

        if not isinstance(v, list):

            v = [v]

        for value in v:

            ax.axvline(x=value, color="black", linestyle="--")

    plt.tight_layout()

    plt.show()

class Estimator(object):

    def __init__(selfmodelncov_dftotal_populationname=Noneplaces=Noneareas=None,

                 excluded_places=Nonestart_date=Noneend_date=Nonedate_format="%d%b%Y", **params):

        """

        Set training data.

        @model <ModelBase>: the model

        @name <str>: name of the area

        @params: fixed parameter of the model

        @the other params: See the function named create_target_df()

        """

        # Fixed parameters

        self.fixed_param_dict = params.copy()

        if None in params.values():

            self.fixed_param_dict = {

                k: v for (k, v) in params.items() if v is not None

            }

        # Register the dataset arranged for the model

        dataset = model.create_dataset(

            ncov_df, total_population, places=places, areas=areas,

            excluded_places=excluded_places,

            start_date=start_date, end_date=end_date, date_format=date_format

        )

        self.start_time, self.initials, self.Tend, self.train_df = dataset

        self.total_population = total_population

        self.name = name

        self.model = model

        self.param_dict = dict()

        self.study = None

        self.optimize_df = None

 

    def run(selfn_trials=500):

        """

        Try estimation (optimization of parameters and tau).

        @n_trials <int>: the number of trials

        """

        if self.study is None:

            self.study = optuna.create_study(direction="minimize")

        self.study.optimize(

            lambda x: self.objective(x),

            n_trials=n_trials,

            n_jobs=-1

        )

        param_dict = self.study.best_params.copy()

        param_dict.update(self.fixed_param_dict)

        param_dict["R0"] = self.calc_r0()

        param_dict["score"] = self.score()

        param_dict.update(self.calc_days_dict())

        self.param_dict = param_dict.copy()

        return param_dict

 

    def history_df(self):

        """

        Return the hsitory of optimization.

        @return <pd.DataFrame>

        """

        optimize_df = self.study.trials_dataframe()

        optimize_df["time[s]"] = optimize_df["datetime_complete"] - \

            optimize_df["datetime_start"]

        optimize_df["time[s]"] = optimize_df["time[s]"].dt.total_seconds()

        self.optimize_df = optimize_df.drop(

            ["datetime_complete""datetime_start""system_attrs__number"], axis=1)

        return self.optimize_df.sort_values("value", ascending=True)

 

    def history_graph(self):

        """

        Show the history of parameter search using pair-plot.

        """

        if self.optimize_df is None:

            self.history_df()

        df = self.optimize_df.copy()

        sns.pairplot(df.loc[:, df.columns.str.startswith(

            "params_")], diag_kind="kde", markers="+")

        plt.show()

 

    def objective(selftrial):

        # Time

        try:

            tau = self.fixed_param_dict["tau"]

        except KeyError:

            tau = trial.suggest_int("tau"11440)

        train_df_divided = self.train_df.copy()

        train_df_divided["t"] = (train_df_divided["T"] / tau).astype(np.int64)

        # Parameters

        param_dict = self.model.param_dict(train_df_divided)

        p_dict = {"tau"None}

        p_dict.update(

            {

                k: trial.suggest_uniform(k, *v)

                for (k, v) in param_dict.items()

            }

        )

        p_dict.update(self.fixed_param_dict)

        p_dict.pop("tau")

        # Simulation

        t_end = train_df_divided.loc[train_df_divided.index[-1], "t"]

        sim_df = simulation(self.model, self.initials, step_n=t_end, **p_dict)

        return self.error_f(train_df_divided, sim_df)

 

    def error_f(selftrain_df_dividedsim_df):

        """

        We need to minimize the difference of the observed values and estimated values.

        This function calculate the difference of the estimated value and obsereved value.

        """

        n = self.total_population

        df = pd.merge(train_df_divided, sim_df, on="t", suffixes=("_observed""_estimated"))

        diffs = [

            # Weighted Average: the recent data is more important

            p * np.average(

                abs(df[f"{v}_observed"] - df[f"{v}_estimated"]) / (df[f"{v}_observed"] * n + 1),

                weights=df["t"]

            )

            for (p, v) in zip(self.model.PRIORITIES, self.model.VARIABLES)

        ]

        return sum(diffs) * n

 

    def compare_df(self):

        """

        Show the taining data and simulated data in one dataframe.

 

        """

        est_dict = self.study.best_params.copy()

        est_dict.update(self.fixed_param_dict)

        tau = est_dict["tau"]

        est_dict.pop("tau")

        observed_df = self.train_df.drop("T", axis=1)

        observed_df["t"] = (self.train_df["T"] / tau).astype(int)

        t_end = observed_df.loc[observed_df.index[-1], "t"]

        sim_df = simulation(self.model, self.initials, step_n=t_end, **est_dict)

        df = pd.merge(observed_df, sim_df, on="t", suffixes=("_observed""_estimated"))

        df = df.set_index("t")

        return df

 

    def compare_graph(self):

        """

        Compare obsereved and estimated values in graphs.

        """

        df = self.compare_df()

        use_variables = [

            v for (i, (p, v)) in enumerate(zip(self.model.PRIORITIES, self.model.VARIABLES))

            if p != 0 and i != 0

        ]

        val_len = len(use_variables) + 1

        fig, axes = plt.subplots(

            ncols=1, nrows=val_len, figsize=(96 * val_len / 2))

        for (ax, v) in zip(axes.ravel()[1:], use_variables):

            df[[f"{v}_observed"f"{v}_estimated"]].plot.line(

                ax=ax, ylim=(0None), sharex=True,

                title=f"{self.model.NAME}: Comparison of observed/estimated {v}(t)"

            )

            ax.yaxis.set_major_formatter(ScalarFormatter(useMathText=True))

            ax.ticklabel_format(style="sci",  axis="y", scilimits=(00))

            ax.legend(bbox_to_anchor=(1.020),

                      loc="lower left", borderaxespad=0)

        for v in use_variables:

            df[f"{v}_diff"] = df[f"{v}_observed"] - df[f"{v}_estimated"]

            df[f"{v}_diff"].plot.line(

                ax=axes.ravel()[0], sharex=True,

                title=f"{self.model.NAME}: observed - estimated"

            )

        axes.ravel()[0].axhline(y=0, color="black", linestyle="--")

        axes.ravel()[0].yaxis.set_major_formatter(

            ScalarFormatter(useMathText=True))

        axes.ravel()[0].ticklabel_format(

            style="sci",  axis="y", scilimits=(00))

        axes.ravel()[0].legend(bbox_to_anchor=(1.020),

                               loc="lower left", borderaxespad=0)

        fig.tight_layout()

        fig.show()

 

    def calc_r0(self):

        """

        Calculate R0.

        """

        est_dict = self.study.best_params.copy()

        est_dict.update(self.fixed_param_dict)

        est_dict.pop("tau")

        model_instance = self.model(**est_dict)

        return model_instance.calc_r0()

 

    def calc_days_dict(self):

        """

        Calculate 1/beta etc.

        """

        est_dict = self.study.best_params.copy()

        est_dict.update(self.fixed_param_dict)

        tau = est_dict["tau"]

        est_dict.pop("tau")

        model_instance = self.model(**est_dict)

        return model_instance.calc_days_dict(tau)

 

    def predict_df(selfstep_n):

        """

        Predict the values in the future.

        @step_n <int>: the number of steps

        @return <pd.DataFrame>: predicted data for measurable variables.

        """

        est_dict = self.study.best_params.copy()

        est_dict.update(self.fixed_param_dict)

        tau = est_dict["tau"]

        est_dict.pop("tau")

        df = simulation(self.model, self.initials, step_n=step_n, **est_dict)

        df["Time"] = (

            df["t"] * tau).apply(lambda x: timedelta(minutes=x)) + self.start_time

        df = df.set_index("Time").drop("t", axis=1)

        df = (df * self.total_population).astype(np.int64)

        upper_cols = [n.upper() for n in df.columns]

        df.columns = upper_cols

        df = self.model.calc_variables_reverse(df).drop(upper_cols, axis=1)

        return df

 

    def predict_graph(selfstep_nname=Noneexcluded_cols=None):

        """

        Predict the values in the future and create a figure.

        @step_n <int>: the number of steps

        @name <str>: name of the area

        @excluded_cols <list[str]>: the excluded columns in the figure

        """

        if self.name is not None:

            name = self.name

        else:

            name = str() if name is None else name

        df = self.predict_df(step_n=step_n)

        if excluded_cols is not None:

            df = df.drop(excluded_cols, axis=1)

        r0 = self.param_dict["R0"]

        title = f"Prediction in {name} with {self.model.NAME} model: R0 = {r0}"

        today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)

        line_plot(df, title, v=today, h=self.total_population)

 

    def rmsle(selfcompare_df):

        """

        Return the value of RMSLE.

        @param compare_df <pd.DataFrame>

        """

        df = compare_df.set_index("t") * self.total_population

        score = 0

        for (priority, v) in zip(self.model.PRIORITIES, self.model.VARIABLES):

            if priority == 0:

                continue

            observed, estimated = df[f"{v}_observed"], df[f"{v}_estimated"]

            diff = (np.log10(observed + 1) - np.log10(estimated + 1))

            score += (diff ** 2).sum()

        rmsle = np.sqrt(score / len(df) * 2)

        return rmsle

 

    def score(self):

        """

        Return the value of RMSLE.

        """

        rmsle = self.rmsle(self.compare_df().reset_index("t"))

        return rmsle

 

    def info(self):

        """

        Return Estimater information.

        @return <tupple[object]>:

            - <ModelBase>: model

            - <dict[str]=str>: name, total_population, start_time, tau

            - <dict[str]=float>: values of parameters of model

        """

        param_dict = self.study.best_params.copy()

        param_dict.update(self.fixed_param_dict)

        info_dict = {

            "name"self.name,

            "total_population"self.total_population,

            "start_time"self.start_time,

            "tau": param_dict["tau"],

            "initials"self.initials

        }

        param_dict.pop("tau")

        return (self.model, info_dict, param_dict)

得到部分图表如下:

利用Python对武汉新型冠状肺炎SEIR模型进行数据可视化