Memecahkan masalah penjadwal Airflow

Cloud Composer 1 | Cloud Composer 2

Halaman ini memberikan langkah-langkah pemecahan masalah dan informasi untuk masalah umum pada penjadwal Airflow.

Mengidentifikasi sumber masalah

Untuk memulai pemecahan masalah, identifikasi apakah masalah terjadi pada waktu penguraian DAG atau saat memproses tugas pada waktu eksekusi. Untuk mengetahui informasi selengkapnya tentang waktu penguraian dan waktu eksekusi, baca Perbedaan antara waktu penguraian DAG dan waktu eksekusi DAG.

Memeriksa log Prosesor DAG

Jika Anda memiliki DAG yang kompleks, maka Prosesor DAG, yang dijalankan oleh penjadwal, mungkin tidak mengurai semua DAG Anda. Hal ini dapat menyebabkan banyak masalah dengan gejala berikut.

Gejala:

  • Jika Prosesor DAG mengalami masalah saat mengurai DAG, hal tersebut mungkin menyebabkan kombinasi masalah yang tercantum di bawah. Jika DAG dihasilkan secara dinamis, masalah ini mungkin lebih berdampak dibandingkan dengan DAG statis.

  • DAG tidak terlihat di UI Airflow dan UI DAG.

  • DAG tidak dijadwalkan untuk dieksekusi.

  • Terdapat error dalam log prosesor DAG, misalnya:

    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    atau

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • Penjadwal Airflow mengalami masalah yang menyebabkan penjadwal dimulai ulang.

  • Tugas Airflow yang dijadwalkan untuk dieksekusi akan dibatalkan dan DAG yang dijalankan untuk DAG yang gagal diurai mungkin ditandai sebagai failed. Contoh:

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

Solusi:

  • Tingkatkan parameter yang terkait dengan penguraian DAG:

    • Tingkatkan dagbag-import-timeout menjadi minimal 120 detik (atau lebih, jika diperlukan).

    • Tingkatkan dag-file-processor-timeout hingga minimal 180 detik (atau lebih, jika diperlukan). Nilai ini harus lebih tinggi dari dagbag-import-timeout.

  • Perbaiki atau hapus DAG yang menyebabkan masalah pada prosesor DAG.

Memeriksa waktu penguraian DAG

Untuk memverifikasi apakah masalah terjadi pada waktu penguraian DAG, ikuti langkah-langkah berikut.

Konsol

Di konsol Google Cloud, Anda dapat menggunakan halaman Monitoring dan tab Logs untuk memeriksa waktu penguraian DAG.

Periksa waktu penguraian DAG dengan halaman Cloud Composer Monitoring:

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Monitoring akan terbuka.

  3. Di tab Monitoring, tinjau diagram Total waktu penguraian untuk semua file DAG di bagian DAG running dan identifikasi kemungkinan masalah.

    Bagian DAG run di tab Composer Monitoring menampilkan metrik respons untuk DAG di lingkungan Anda

Periksa waktu penguraian DAG dengan tab Cloud Composer Logs:

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Monitoring akan terbuka.

  3. Buka tab Logs, dan dari hierarki navigasi All logs, pilih bagian DAG pemroses manager.

  4. Tinjau log dag-processor-manager dan identifikasi kemungkinan masalah.

    Log prosesor DAG akan menampilkan waktu penguraian DAG

gcloud

Gunakan perintah dags report untuk melihat waktu penguraian untuk semua DAG Anda.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

Ganti:

  • ENVIRONMENT_NAME dengan nama lingkungan.
  • LOCATION dengan region tempat lingkungan berada.

Output perintah terlihat mirip dengan berikut ini:

Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

Cari nilai duration untuk setiap dag yang tercantum dalam tabel. Nilai yang besar dapat menunjukkan bahwa salah satu DAG Anda tidak diterapkan dengan cara yang optimal. Dari tabel output, Anda dapat mengidentifikasi DAG mana yang memiliki waktu penguraian yang lama.

Memantau tugas yang sedang berjalan dan dalam antrean

Untuk memeriksa apakah ada tugas yang tertahan dalam antrean, ikuti langkah-langkah berikut.

  1. Di konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

  3. Buka tab Pemantauan.

  4. Di tab Monitoring, tinjau diagram Airflow jobs di bagian DAG running dan identifikasi kemungkinan masalah. Tugas Airflow adalah tugas yang berada dalam status antrean di Airflow. Tugas tersebut dapat masuk ke antrean broker Celery atau Kubernetes Executor. Tugas dalam antrean seledri adalah instance tugas yang dimasukkan ke dalam antrean broker Celery.

Memecahkan masalah pada waktu penguraian DAG

Bagian berikut menjelaskan gejala dan kemungkinan perbaikan untuk beberapa masalah umum pada waktu penguraian DAG.

Jumlah thread terbatas

Mengizinkan pengelola prosesor DAG (bagian dari penjadwal yang memproses file DAG) untuk hanya menggunakan thread dalam jumlah terbatas dapat memengaruhi waktu penguraian DAG Anda.

Untuk mengatasi masalah ini, ganti opsi konfigurasi Airflow berikut:

  • Untuk Airflow 1.10.12 dan versi yang lebih lama, ganti parameter max_threads:

    Bagian Kunci Nilai Notes
    scheduler max_threads NUMBER_OF_CORES_IN_MACHINE - 1 Ganti NUMBER_OF_CORES_IN_MACHINE dengan jumlah core
    di mesin node pekerja.
  • Untuk Airflow 1.10.14 dan versi yang lebih baru, ganti parameter parsing_processes:

    Bagian Kunci Nilai Notes
    scheduler parsing_processes NUMBER_OF_CORES_IN_MACHINE - 1 Ganti NUMBER_OF_CORES_IN_MACHINE dengan jumlah core
    di mesin node pekerja.

Distribusi jumlah dan waktu tugas

Airflow dikenal memiliki masalah terkait penjadwalan tugas kecil dalam jumlah besar. Dalam situasi seperti itu, Anda harus memilih tugas yang lebih terkonsolidasi dalam jumlah yang lebih sedikit.

Menjadwalkan DAG atau tugas dalam jumlah besar secara bersamaan mungkin juga menjadi kemungkinan sumber masalah. Untuk menghindari masalah ini, seiring waktu, distribusikan tugas Anda secara lebih merata.

Memecahkan masalah terkait tugas yang sedang berjalan dan dalam antrean

Bagian berikut ini menjelaskan gejala dan kemungkinan perbaikan untuk beberapa masalah umum terkait tugas yang sedang berjalan dan dalam antrean.

Task queue terlalu panjang

Dalam beberapa kasus, task queue mungkin terlalu panjang untuk penjadwal. Untuk mengetahui informasi cara mengoptimalkan parameter worker dan seledri, baca artikel menskalakan lingkungan Cloud Composer bersama bisnis Anda.

Menggunakan fitur TimeTable penjadwal Airflow

Mulai dari Airflow 2.2, Anda dapat menentukan tabel waktu untuk DAG menggunakan fitur baru bernama TimeTable.

Anda dapat menentukan tabel waktu menggunakan salah satu metode berikut:

Resource cluster terbatas

Bagian ini hanya berlaku untuk Cloud Composer 1.

Anda mungkin mengalami masalah performa jika cluster GKE lingkungan Anda terlalu kecil untuk menangani semua DAG dan tugas. Dalam kasus ini, coba salah satu solusi berikut:

  • Buat lingkungan baru dengan jenis mesin yang memberikan performa lebih baik dan migrasikan DAG Anda ke lingkungan tersebut.
  • Buat lebih banyak lingkungan Cloud Composer dan pisahkan DAG di antara lingkungan tersebut.
  • Ubah jenis mesin untuk node GKE, seperti yang dijelaskan di Mengupgrade jenis mesin untuk node GKE. Karena prosedur ini rentan terhadap error, prosedur ini adalah opsi yang paling tidak direkomendasikan.
  • Upgrade jenis mesin dari instance Cloud SQL yang menjalankan database Airflow di lingkungan Anda, misalnya menggunakan perintah gcloud composer environments update. Performa database Airflow yang rendah mungkin menjadi penyebab penjadwal lambat.

Menghindari penjadwalan tugas selama masa pemeliharaan

Anda dapat menentukan masa pemeliharaan tertentu untuk lingkungan Anda. Selama periode waktu ini, peristiwa pemeliharaan untuk Cloud SQL dan GKE berlangsung.

Membuat penjadwal Airflow mengabaikan file yang tidak perlu

Anda dapat meningkatkan performa penjadwal Airflow dengan melewati file yang tidak perlu di folder DAG. Penjadwal Airflow mengabaikan file dan folder yang ditentukan dalam file .airflowignore.

Agar penjadwal Airflow mengabaikan file yang tidak perlu:

  1. Buat file .airflowignore.
  2. Dalam file ini, cantumkan file dan folder yang harus diabaikan.
  3. Upload file ini ke folder /dags di bucket lingkungan Anda.

Untuk informasi selengkapnya tentang format file .airflowignore, lihat dokumentasi Airflow.

Proses penjadwal airflow yang dijeda

Pengguna Airflow menjeda DAG untuk menghindari eksekusi. Tindakan ini akan menghemat siklus pemrosesan pekerja Airflow.

Scheduler Airflow akan terus mengurai DAG yang dijeda. Jika Anda benar-benar ingin meningkatkan performa penjadwal Airflow, gunakan .airflowignore atau hapus DAG yang dijeda dari folder DAG.

Penggunaan 'wait_for_downstream' di DAG Anda

Jika Anda menetapkan parameter wait_for_downstream ke True di DAG, agar tugas berhasil, semua tugas yang langsung berada di downstream tugas ini juga harus berhasil. Artinya, eksekusi tugas yang termasuk dalam proses DAG tertentu mungkin akan diperlambat oleh eksekusi tugas dari DAG sebelumnya. Baca selengkapnya di dokumentasi Airflow.

Tugas yang diantrekan terlalu lama akan dibatalkan dan dijadwalkan ulang

Jika tugas Airflow disimpan dalam antrean terlalu lama, penjadwal akan menjadwalkan ulang tugas tersebut untuk dieksekusi (dalam versi Airflow sebelum 2.3.1, tugas tersebut juga ditandai sebagai gagal dan dicoba ulang jika memenuhi syarat untuk dicoba ulang).

Salah satu cara untuk mengamati gejala situasi ini adalah dengan melihat diagram dengan jumlah tugas dalam antrean (tab "Monitoring" di UI Cloud Composer) dan jika lonjakan pada diagram ini tidak turun dalam waktu sekitar dua jam, kemungkinan besar tugas tersebut akan dijadwalkan ulang (tanpa log) diikuti dengan entri log "Tugas yang diadopsi masih tertunda ..." di log penjadwal. Dalam kasus tersebut, Anda mungkin melihat pesan "File log tidak ditemukan..." di log tugas Airflow karena tugas tidak dieksekusi.

Secara umum, perilaku ini wajar dan instance tugas terjadwal berikutnya dimaksudkan untuk dieksekusi sesuai jadwal. Jika Anda melihat banyak kasus seperti ini di lingkungan Cloud Composer, kemungkinan jumlah pekerja Airflow di lingkungan Anda tidak cukup untuk memproses semua tugas terjadwal.

Resolusi: Untuk mengatasi masalah ini, Anda perlu memastikan selalu ada kapasitas pada pekerja Airflow untuk menjalankan tugas dalam antrean. Misalnya, Anda dapat meningkatkan jumlah worker atau worker_concurrency. Anda juga dapat menyesuaikan paralelisme atau kumpulan untuk mencegah tugas mengantrekan melebihi kapasitas yang Anda miliki.

Secara sporadis, tugas yang sudah tidak berlaku mungkin memblokir eksekusi DAG tertentu

Dalam kasus umum, penjadwal Airflow harus dapat menangani situasi ketika ada tugas yang sudah tidak berlaku dalam antrean sehingga tidak dapat dijalankan dengan benar (misalnya, DAG tempat tugas lama tersebut telah dihapus).

Jika tugas yang sudah tidak berlaku ini tidak dihapus permanen oleh penjadwal, Anda mungkin perlu menghapusnya secara manual. Anda dapat melakukannya, misalnya, di UI Airflow - Anda dapat membuka (Menu > Browser > Instance Tugas), menemukan tugas dalam antrean milik DAG yang sudah tidak berlaku, lalu menghapusnya.

Untuk mengatasi masalah ini, upgrade lingkungan Anda ke Cloud Composer versi 2.1.12 atau yang lebih baru.

Pendekatan Cloud Composer untuk parameter [scheduler]min_file_process_interval

Cloud Composer mengubah cara penggunaan [scheduler]min_file_process_interval oleh penjadwal Airflow.

Aliran udara 1

Jika Cloud Composer menggunakan Airflow 1, pengguna dapat menetapkan nilai [scheduler]min_file_process_interval antara 0 dan 600 detik. Nilai yang lebih tinggi dari 600 detik memberikan hasil yang sama seperti jika [scheduler]min_file_process_interval disetel ke 600 detik.

Aliran udara 2

Di Airflow 2, [scheduler]min_file_process_interval hanya dapat digunakan dengan versi 1.19.9 dan 2.0.26 atau yang lebih baru

  • Cloud Composer versi yang lebih lama dari 1.19.9 dan 2.0.26

    Pada versi ini, [scheduler]min_file_process_interval diabaikan.

  • Cloud Composer versi 1.19.9 atau 2.0.26, atau versi yang lebih baru

    Penjadwal airflow dimulai ulang setelah beberapa kali semua DAG dijadwalkan dan parameter [scheduler]num_runs mengontrol frekuensinya dilakukan oleh penjadwal. Saat mencapai loop penjadwalan [scheduler]num_runs, penjadwal akan dimulai ulang. Penjadwal adalah komponen stateless, dan mulai ulang tersebut merupakan mekanisme pemulihan otomatis untuk setiap masalah yang mungkin dialami Penjadwal. Jika tidak ditentukan, nilai default [scheduler]num_runs akan diterapkan, yaitu 5.000.

    [scheduler]min_file_process_interval dapat digunakan untuk mengonfigurasi seberapa sering penguraian DAG terjadi, tetapi parameter ini tidak boleh lebih lama dari waktu yang diperlukan bagi penjadwal untuk melakukan loop [scheduler]num_runs saat menjadwalkan DAG.

Menskalakan konfigurasi Airflow

Airflow menyediakan opsi konfigurasi Airflow yang mengontrol jumlah tugas dan DAG yang dapat dijalankan Airflow secara bersamaan. Untuk menetapkan opsi konfigurasi ini, ganti nilainya untuk lingkungan Anda.

  • Pekerja Konkurensi

    Parameter [celery]worker_concurrency mengontrol jumlah tugas maksimum yang dapat dijalankan pekerja Airflow secara bersamaan. Jika Anda mengalikan nilai parameter ini dengan jumlah pekerja Airflow di lingkungan Cloud Composer, Anda akan mendapatkan jumlah tugas maksimum yang dapat dijalankan dalam momen tertentu di lingkungan Anda. Jumlah ini dibatasi oleh opsi konfigurasi Airflow [core]parallelism, yang akan dijelaskan lebih lanjut.

    Di lingkungan Cloud Composer 2, nilai default [celery]worker_concurrency dihitung secara otomatis

    • Untuk versi Airflow: 2.3.3 dan yang lebih baru, [celery]worker_concurrency ditetapkan ke nilai minimum 32, 12 * worker_CPU, dan 8 * worker_memory.

    • Untuk versi Airflow: 2.2.5 atau yang lebih lama, [celery]worker_concurrency ditetapkan ke 12 * jumlah CPU pekerja.

  • Maksimum Operasi DAG Aktif

    Opsi konfigurasi Airflow [core]max_active_runs_per_dag mengontrol jumlah maksimum DAG aktif yang dijalankan per DAG. Penjadwal tidak akan membuat operasi DAG lagi jika mencapai batas ini.

    Jika parameter ini tidak disetel dengan benar, Anda mungkin akan mengalami masalah saat penjadwal akan men-throttle eksekusi DAG karena tidak dapat membuat lebih banyak instance yang menjalankan DAG pada momen tertentu.

  • Tugas Aktif Maksimal Per DAG

    Opsi konfigurasi Airflow [core]max_active_tasks_per_dag mengontrol jumlah maksimum instance tugas yang dapat berjalan secara serentak di setiap DAG. Ini adalah parameter level DAG.

    Jika parameter ini tidak disetel dengan benar, Anda mungkin akan mengalami masalah saat eksekusi satu instance DAG lambat karena hanya ada sejumlah tugas DAG yang dapat dijalankan pada waktu tertentu.

    Solusi: tingkatkan [core]max_active_tasks_per_dag.

  • Paralelisme dan ukuran kumpulan

    Opsi konfigurasi Airflow [core]parallelism mengontrol jumlah tugas yang dapat diantrekan oleh penjadwal Airflow dalam antrean Executor setelah semua dependensi untuk tugas ini terpenuhi.

    Ini adalah parameter global untuk seluruh penyiapan Airflow.

    Tugas diantrekan dan dieksekusi dalam sebuah kumpulan. Lingkungan Cloud Composer hanya menggunakan satu kumpulan. Ukuran kumpulan ini mengontrol jumlah tugas yang dapat diantrekan oleh penjadwal untuk dieksekusi pada momen tertentu. Jika ukuran kumpulan terlalu kecil, penjadwal tidak dapat mengantrekan tugas untuk dieksekusi meskipun batas minimum, yang ditentukan oleh opsi konfigurasi [core]parallelism dan opsi konfigurasi [celery]worker_concurrency dikalikan dengan jumlah pekerja Airflow, belum terpenuhi.

    Anda dapat mengonfigurasi ukuran kumpulan di UI Airflow (Menu > Admin > Pools). Sesuaikan ukuran kumpulan dengan tingkat paralelisme yang Anda harapkan di lingkungan Anda.

    Biasanya, [core]parallelism ditetapkan sebagai produk dari jumlah pekerja maksimum dan [celery]worker_concurrency.

DAG tidak dijadwalkan oleh penjadwal karena waktu tunggu prosesor DAG habis

Untuk mengetahui informasi selengkapnya tentang masalah ini, lihat Memecahkan masalah DAG.

Menandai tugas sebagai gagal setelah mencapai dagrun_timeout

Penjadwal menandai tugas yang belum selesai (berjalan, dijadwalkan, dan diantrekan) sebagai gagal jika operasi DAG tidak selesai dalam dagrun_timeout (parameter DAG).

Solusi:

Gejala Database Airflow sedang di bawah tekanan beban

Terkadang di log penjadwal Airflow, Anda mungkin melihat entri log peringatan berikut:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Gejala serupa mungkin juga diamati di log pekerja Airflow:

Untuk MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

Untuk PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

Error atau peringatan tersebut mungkin merupakan gejala dari database Airflow yang kewalahan dengan jumlah koneksi terbuka atau jumlah kueri yang dijalankan pada saat yang sama, baik oleh penjadwal atau komponen Airflow lainnya seperti pekerja, pemicu, dan server web.

Solusi yang memungkinkan:

Server web menampilkan peringatan 'Penjadwal tampaknya tidak berjalan'

Penjadwal melaporkan detak jantungnya secara teratur ke database Airflow. Berdasarkan informasi ini, server web Airflow menentukan apakah penjadwal aktif.

Terkadang, jika penjadwal sedang memuat beban yang berat, maka mungkin tidak dapat melaporkan detak jantungnya setiap [scheduler]scheduler-heartbeat-sec.

Dalam situasi tersebut, server web Airflow mungkin menampilkan peringatan berikut:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

Solusi yang memungkinkan:

Solusi untuk masalah yang dialami selama pengisian ulang DAG

Terkadang, Anda mungkin ingin menjalankan kembali DAG yang sudah dieksekusi. Anda dapat melakukannya dengan alat command line Airflow dengan cara berikut:

Aliran udara 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
  backfill -- -B \
  -s START_DATE \
  -e END_DATE \
  DAG_NAME

Untuk menjalankan kembali tugas yang gagal hanya untuk DAG tertentu, gunakan juga argumen --rerun_failed_tasks.

Aliran udara 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

Untuk menjalankan kembali tugas yang gagal hanya untuk DAG tertentu, gunakan juga argumen --rerun-failed-tasks.

Ganti:

  • ENVIRONMENT_NAME dengan nama lingkungan.
  • LOCATION dengan region tempat lingkungan berada.
  • START_DATE dengan nilai untuk parameter DAG start_date, dalam format YYYY-MM-DD.
  • END_DATE dengan nilai untuk parameter DAG end_date, dalam format YYYY-MM-DD.
  • DAG_NAME dengan nama DAG.

Operasi pengisian ulang terkadang dapat menghasilkan situasi deadlock ketika pengisian ulang tidak mungkin dilakukan karena ada kunci pada tugas. Contoh:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

Dalam beberapa kasus, Anda dapat menggunakan solusi berikut untuk mengatasi deadlock:

  • Nonaktifkan Penjadwal mini dengan mengganti [core]schedule-after-task-execution ke False.

  • Jalankan pengisian ulang untuk rentang tanggal yang lebih sempit. Misalnya, tetapkan START_DATE dan END_DATE untuk menentukan periode hanya 1 hari.

Langkah selanjutnya