气流:如何删除DAG?

问题描述:

我已经启动了Airflow网络服务器并安排了一些dag。我可以在Web GUI上看到dag。气流:如何删除DAG?

如何从运行中删除特定的DAG并在Web GUI中显示?有没有一个Airflow CLI命令来做到这一点?

我环顾四周,但无法找到一个简单的方法来删除DAG,一旦它已被加载和计划的答案。

+0

没有CLI这个删除DAG文件。但是,如果您想要尝试恢复它,那么就会放弃pull请求:https://github.com/apache/incubator-airflow/pull/1344 – TheF1rstPancake

Airflow没有内置的功能可以为您做到这一点。为了删除DAG,请将其从存储库中删除,并删除Airflow Metastore表中的数据库条目 - dag。

+0

我还必须重新启动计划和网络服务器所在的机器运行完成清理。简单地重新启动Web服务器和调度程序是不够的。 –

我刚刚写了一个脚本,删除与特定dag相关的所有内容,但这仅适用于MySQL。如果您使用PostgreSQL,则可以编写不同的连接器方法。最初由兰斯发布的命令是https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0 我只是把它放在脚本中。希望这可以帮助。格式:蟒蛇script.py dag_id

import sys 
import MySQLdb 

dag_input = sys.argv[1] 

query = {'delete from xcom where dag_id = "' + dag_input + '"', 
     'delete from task_instance where dag_id = "' + dag_input + '"', 
     'delete from sla_miss where dag_id = "' + dag_input + '"', 
     'delete from log where dag_id = "' + dag_input + '"', 
     'delete from job where dag_id = "' + dag_input + '"', 
     'delete from dag_run where dag_id = "' + dag_input + '"', 
     'delete from dag where dag_id = "' + dag_input + '"' } 

def connect(query): 
     db = MySQLdb.connect(host="hostname", user="username", passwd="password", db="database") 
     cur = db.cursor() 
     cur.execute(query) 
     db.commit() 
     db.close() 
     return 

for value in query: 
     print value 
     connect(value) 

不知道为什么Apache的气流没有明显的和简单的方法来删除DAG

提起https://issues.apache.org/jira/browse/AIRFLOW-1002

+2

这是PR公开,但尚未合并。对于那些感兴趣的链接 - https://github.com/apache/incubator-airflow/pull/2199。 –

这是使用PostgresHook我适应代码默认的connection_id。

import sys 
from airflow.hooks.postgres_hook import PostgresHook 

dag_input = sys.argv[1] 
hook=PostgresHook(postgres_conn_id= "airflow_db") 

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]: 
    sql="delete from {} where dag_id='{}'".format(t, dag_input) 
    hook.run(sql, True) 
+2

我认为你也可以将'task_fail'和'dag_stats'添加到表 – marengaz

我已经编写了一个脚本,用于删除与默认SQLite数据库相关的特定dag的所有元数据。这是基于耶稣的回答,但是从Postgres改编为SQLite。用户应将../airflow.db设置为相对于默认airflow.db文件(通常为~/airflow)存储script.py的任何位置。要执行,请使用python script.py dag_id

import sqlite3 
import sys 

conn = sqlite3.connect('../airflow.db') 
c = conn.cursor() 

dag_input = sys.argv[1] 

for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]: 
    query = "delete from {} where dag_id='{}'".format(t, dag_input) 
    c.execute(query) 

conn.commit() 
conn.close() 
+0

这个表的列表中,这是一个很好的解决方案,至少在PR合并之前是这样的 –

您可以清除一组任务实例,就好像他们从来没有跑:

airflow clear dag_id -s 2017-1-23 -e 2017-8-31 

然后从DAG的文件夹

+1

这可能会导致'dag'表中有一些未清理的数据 – Chengzhi