|
19 | 19 | from typing import Optional
|
20 | 20 |
|
21 | 21 | import sqlalchemy_jsonfield
|
22 |
| -from sqlalchemy import Column, String, and_, tuple_ |
| 22 | +from sqlalchemy import Column, String, and_, not_, tuple_ |
23 | 23 | from sqlalchemy.orm import Session
|
24 | 24 |
|
25 | 25 | from airflow.configuration import conf
|
@@ -109,29 +109,52 @@ def delete_old_records(
|
109 | 109 | if num_to_keep <= 0:
|
110 | 110 | return
|
111 | 111 |
|
112 |
| - # Fetch Top X records given dag_id & task_id ordered by Execution Date |
113 |
| - subq1 = ( |
114 |
| - session |
115 |
| - .query(cls.dag_id, cls.task_id, cls.execution_date) |
116 |
| - .filter(cls.dag_id == dag_id, cls.task_id == task_id) |
117 |
| - .order_by(cls.execution_date.desc()) |
| 112 | + tis_to_keep_query = session \ |
| 113 | + .query(cls.dag_id, cls.task_id, cls.execution_date) \ |
| 114 | + .filter(cls.dag_id == dag_id, cls.task_id == task_id) \ |
| 115 | + .order_by(cls.execution_date.desc()) \ |
118 | 116 | .limit(num_to_keep)
|
119 |
| - .subquery('subq1') |
120 |
| - ) |
121 |
| - |
122 |
| - # Second Subquery |
123 |
| - # Workaround for MySQL Limitation (https://stackoverflow.com/a/19344141/5691525) |
124 |
| - # Limitation: This version of MySQL does not yet support |
125 |
| - # LIMIT & IN/ALL/ANY/SOME subquery |
126 |
| - subq2 = ( |
127 |
| - session |
128 |
| - .query(subq1.c.dag_id, subq1.c.task_id, subq1.c.execution_date) |
129 |
| - .subquery('subq2') |
130 |
| - ) |
131 |
| - |
132 |
| - session.query(cls) \ |
133 |
| - .filter(and_( |
134 |
| - cls.dag_id == dag_id, |
135 |
| - cls.task_id == task_id, |
136 |
| - tuple_(cls.dag_id, cls.task_id, cls.execution_date).notin_(subq2))) \ |
137 |
| - .delete(synchronize_session=False) |
| 117 | + |
| 118 | + if session.bind.dialect.name in ["postgresql", "sqlite"]: |
| 119 | + # Fetch Top X records given dag_id & task_id ordered by Execution Date |
| 120 | + subq1 = tis_to_keep_query.subquery('subq1') |
| 121 | + |
| 122 | + session.query(cls) \ |
| 123 | + .filter(and_( |
| 124 | + cls.dag_id == dag_id, |
| 125 | + cls.task_id == task_id, |
| 126 | + tuple_(cls.dag_id, cls.task_id, cls.execution_date).notin_(subq1))) \ |
| 127 | + .delete(synchronize_session=False) |
| 128 | + elif session.bind.dialect.name in ["mysql"]: |
| 129 | + # Fetch Top X records given dag_id & task_id ordered by Execution Date |
| 130 | + subq1 = tis_to_keep_query.subquery('subq1') |
| 131 | + |
| 132 | + # Second Subquery |
| 133 | + # Workaround for MySQL Limitation (https://stackoverflow.com/a/19344141/5691525) |
| 134 | + # Limitation: This version of MySQL does not yet support |
| 135 | + # LIMIT & IN/ALL/ANY/SOME subquery |
| 136 | + subq2 = ( |
| 137 | + session |
| 138 | + .query(subq1.c.dag_id, subq1.c.task_id, subq1.c.execution_date) |
| 139 | + .subquery('subq2') |
| 140 | + ) |
| 141 | + |
| 142 | + session.query(cls) \ |
| 143 | + .filter(and_( |
| 144 | + cls.dag_id == dag_id, |
| 145 | + cls.task_id == task_id, |
| 146 | + tuple_(cls.dag_id, cls.task_id, cls.execution_date).notin_(subq2))) \ |
| 147 | + .delete(synchronize_session=False) |
| 148 | + else: |
| 149 | + # Fetch Top X records given dag_id & task_id ordered by Execution Date |
| 150 | + tis_to_keep = tis_to_keep_query.all() |
| 151 | + |
| 152 | + filter_tis = [not_(and_( |
| 153 | + cls.dag_id == ti.dag_id, |
| 154 | + cls.task_id == ti.task_id, |
| 155 | + cls.execution_date == ti.execution_date |
| 156 | + )) for ti in tis_to_keep] |
| 157 | + |
| 158 | + session.query(cls) \ |
| 159 | + .filter(and_(*filter_tis)) \ |
| 160 | + .delete(synchronize_session=False) |
0 commit comments