气流:如何删除DAG?
我已经启动了Airflow网络服务器并安排了一些dag。我可以在Web GUI上看到dag。气流:如何删除DAG?
如何从运行中删除特定的DAG并在Web GUI中显示?有没有一个Airflow CLI命令来做到这一点?
我环顾四周,但无法找到一个简单的方法来删除DAG,一旦它已被加载和计划的答案。
Airflow没有内置的功能可以为您做到这一点。为了删除DAG,请将其从存储库中删除,并删除Airflow Metastore表中的数据库条目 - dag。
我还必须重新启动计划和网络服务器所在的机器运行完成清理。简单地重新启动Web服务器和调度程序是不够的。 –
我刚刚写了一个脚本,删除与特定dag相关的所有内容,但这仅适用于MySQL。如果您使用PostgreSQL,则可以编写不同的连接器方法。最初由兰斯发布的命令是https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0 我只是把它放在脚本中。希望这可以帮助。格式:蟒蛇script.py dag_id
import sys
import MySQLdb
dag_input = sys.argv[1]
query = {'delete from xcom where dag_id = "' + dag_input + '"',
'delete from task_instance where dag_id = "' + dag_input + '"',
'delete from sla_miss where dag_id = "' + dag_input + '"',
'delete from log where dag_id = "' + dag_input + '"',
'delete from job where dag_id = "' + dag_input + '"',
'delete from dag_run where dag_id = "' + dag_input + '"',
'delete from dag where dag_id = "' + dag_input + '"' }
def connect(query):
db = MySQLdb.connect(host="hostname", user="username", passwd="password", db="database")
cur = db.cursor()
cur.execute(query)
db.commit()
db.close()
return
for value in query:
print value
connect(value)
不知道为什么Apache的气流没有明显的和简单的方法来删除DAG
这是PR公开,但尚未合并。对于那些感兴趣的链接 - https://github.com/apache/incubator-airflow/pull/2199。 –
这是使用PostgresHook我适应代码默认的connection_id。
import sys
from airflow.hooks.postgres_hook import PostgresHook
dag_input = sys.argv[1]
hook=PostgresHook(postgres_conn_id= "airflow_db")
for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
sql="delete from {} where dag_id='{}'".format(t, dag_input)
hook.run(sql, True)
我认为你也可以将'task_fail'和'dag_stats'添加到表 – marengaz
我已经编写了一个脚本,用于删除与默认SQLite数据库相关的特定dag的所有元数据。这是基于耶稣的回答,但是从Postgres改编为SQLite。用户应将../airflow.db
设置为相对于默认airflow.db文件(通常为~/airflow
)存储script.py的任何位置。要执行,请使用python script.py dag_id
。
import sqlite3
import sys
conn = sqlite3.connect('../airflow.db')
c = conn.cursor()
dag_input = sys.argv[1]
for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
query = "delete from {} where dag_id='{}'".format(t, dag_input)
c.execute(query)
conn.commit()
conn.close()
这个表的列表中,这是一个很好的解决方案,至少在PR合并之前是这样的 –
您可以清除一组任务实例,就好像他们从来没有跑:
airflow clear dag_id -s 2017-1-23 -e 2017-8-31
然后从DAG的文件夹
这可能会导致'dag'表中有一些未清理的数据 – Chengzhi
没有CLI这个删除DAG文件。但是,如果您想要尝试恢复它,那么就会放弃pull请求:https://github.com/apache/incubator-airflow/pull/1344 – TheF1rstPancake