Ottimizza la scalabilità automatica orizzontale per le pipeline in modalità flusso

Nelle pipeline in modalità flusso con un volume elevato di dati di input, esiste generalmente un compromesso tra costo e latenza. Per mantenere una bassa latenza, Dataflow deve aggiungere worker man mano che il volume di traffico aumenta. Un altro fattore è la velocità con cui la pipeline deve fare lo scale up o lo scale down in risposta alle variazioni nella velocità dei dati di input.

Il gestore della scalabilità automatica di Dataflow ha impostazioni predefinite adatte a molti carichi di lavoro. Tuttavia, ti consigliamo di ottimizzare questo comportamento per il tuo scenario specifico. Ad esempio, potrebbe essere accettabile una latenza media più elevata per ridurre i costi oppure potresti voler fare lo scale up di Dataflow più rapidamente in risposta ai picchi di traffico.

Per ottimizzare la scalabilità automatica orizzontale, puoi regolare i seguenti parametri:

Imposta l'intervallo di scalabilità automatica

Quando crei un nuovo job di inserimento di flussi, puoi impostare il numero iniziale di worker e il numero massimo di worker. A questo scopo, specifica le seguenti opzioni della pipeline:

Java

  • --numWorkers: il numero iniziale di worker disponibili all'avvio della pipeline
  • --maxNumWorkers: numero massimo di worker disponibili per la tua pipeline

Python

  • --num_workers: il numero iniziale di worker disponibili all'avvio della pipeline
  • --max_num_workers: il numero massimo di worker disponibili per la tua pipeline

Go

  • --num_workers: il numero iniziale di worker disponibili all'avvio della pipeline
  • --max_num_workers: il numero massimo di worker disponibili per la tua pipeline

Per i job di elaborazione in modalità flusso che utilizzano Streaming Engine, il flag --maxNumWorkers è facoltativo. Il valore predefinito è 100. Per i job di flussi di dati che non utilizzano Streaming Engine, --maxNumWorkers è richiesto quando è abilitata la scalabilità automatica orizzontale.

Il valore iniziale di --maxNumWorkers determina anche il numero di dischi permanenti allocati per il job. Il deployment delle pipeline viene eseguito con un pool fisso di dischi permanenti, in numero uguale a --maxNumWorkers. Durante il flusso di dati, i dischi permanenti vengono ridistribuiti in modo che ogni worker riceva lo stesso numero di dischi collegati.

Se imposti --maxNumWorkers, assicurati che il valore fornisca dischi sufficienti per la pipeline. Considera la crescita futura quando imposti il valore iniziale. Per informazioni sulle prestazioni dei Persistent Disk, consulta Configurare il Persistent Disk e le VM. Dataflow fattura l'utilizzo di Persistent Disk e dispone di quote di Compute Engine, incluse quelle per i dischi permanenti.

Per impostazione predefinita, il numero minimo di worker è 1 per i job di flusso che utilizzano Streaming Engine e (maxNumWorkers/15), arrotondato per eccesso, per i job che non utilizzano Streaming Engine.

Aggiorna l'intervallo di scalabilità automatica

Per i job che utilizzano Streaming Engine, puoi regolare il numero minimo e massimo di worker, senza arrestare o sostituire il job. Per modificare questi valori, utilizza un aggiornamento del job in corso. Aggiorna le seguenti opzioni del job:

  • --min-num-workers: il numero minimo di worker.
  • --max-num-workers: il numero massimo di worker.

gcloud

Usa il comando gcloud dataflow jobs update-options:

gcloud dataflow jobs update-options \
  --region=REGION \
  --min-num-workers=MINIMUM_WORKERS \
  --max-num-workers=MAXIMUM_WORKERS \
  JOB_ID

Sostituisci quanto segue:

  • REGION: l'ID regione dell'endpoint a livello di regione del job
  • MINIMUM_WORKERS: numero minimo di istanze di Compute Engine
  • MAXIMUM_WORKERS: numero massimo di istanze di Compute Engine
  • JOB_ID: l'ID del job da aggiornare

Puoi anche aggiornare singolarmente --min-num-workers e --max-num-workers.

REST

Utilizza il metodo projects.locations.jobs.update:

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": MINIMUM_WORKERS,
    "max_num_workers": MAXIMUM_WORKERS
  }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud del job Dataflow
  • REGION: l'ID regione dell'endpoint a livello di regione del job
  • JOB_ID: l'ID del job da aggiornare
  • MINIMUM_WORKERS: numero minimo di istanze di Compute Engine
  • MAXIMUM_WORKERS: numero massimo di istanze di Compute Engine

Puoi anche aggiornare min_num_workers e max_num_workers singolarmente. Specifica i parametri da aggiornare nel parametro di query updateMask e includi i valori aggiornati nel campo runtimeUpdatableParams del corpo della richiesta. L'esempio seguente aggiorna min_num_workers:

PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": 5
  }
}

Per i job che non utilizzano Streaming Engine, puoi sostituire il job esistente con un valore aggiornato di maxNumWorkers.

Se aggiorni un job di inserimento di flussi che non utilizza Streaming Engine, la scalabilità automatica orizzontale del job aggiornato è disabilitata per impostazione predefinita. Per mantenere la scalabilità automatica abilitata, specifica --autoscalingAlgorithm e --maxNumWorkers per il job aggiornato.

Imposta il suggerimento per l'utilizzo dei worker

Dataflow utilizza l'utilizzo medio della CPU come indicatore per capire quando applicare la scalabilità automatica orizzontale. Per impostazione predefinita, Dataflow imposta un utilizzo target della CPU pari a 0,8. Se l'utilizzo non rientra in questo intervallo, Dataflow potrebbe aggiungere o rimuovere worker.

Per un maggiore controllo sul comportamento della scalabilità automatica, puoi impostare l'utilizzo di destinazione della CPU su un valore compreso nell'intervallo [0,1, 0,9].

  • Imposta un valore di utilizzo della CPU più basso se vuoi ottenere latenze di picco più basse. Un valore più basso consente a Dataflow di fare lo scale out in modo più aggressivo in risposta all'aumento dell'utilizzo dei worker e di scalare in modo più conservativo per migliorare la stabilità. Un valore più basso fornisce anche un margine maggiore quando la pipeline è in esecuzione a uno stato stazionario, con una latenza di coda più bassa. (La latenza della coda misura i tempi di attesa più lunghi prima che un nuovo record venga elaborato.)

  • Imposta un valore più alto se vuoi risparmiare risorse e mantenere i costi più bassi quando il traffico aumenta. Un valore più elevato impedisce un upscaling eccessivo, a scapito di una maggiore latenza.

Per configurare il suggerimento sull'utilizzo quando esegui un job, imposta l'opzione di servizio worker_utilization_hint:

Java

--dataflowServiceOptions=worker_utilization_hint=TARGET_UTILIZATION

Sostituisci TARGET_UTILIZATION con un valore compreso nell'intervallo [0,1, 0,9].

Python

--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION

Sostituisci TARGET_UTILIZATION con un valore compreso nell'intervallo [0,1, 0,9].

Go

--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION

Sostituisci TARGET_UTILIZATION con un valore compreso nell'intervallo [0,1, 0,9].

Per le nuove pipeline, ti consigliamo di eseguire i test con carichi realistici utilizzando l'impostazione predefinita. Valuta quindi il comportamento della scalabilità automatica in base alla pipeline e apporta le modifiche necessarie.

Il suggerimento sull'utilizzo è solo uno dei fattori che Dataflow utilizza per decidere se scalare i worker. Altri fattori come il backlog e le chiavi disponibili possono sostituire il valore del suggerimento. Inoltre, il suggerimento non è un obiettivo preciso. Il gestore della scalabilità automatica cerca di mantenere l'utilizzo della CPU all'interno dell'intervallo del valore del suggerimento, ma la metrica di utilizzo aggregata potrebbe essere superiore o inferiore. Per ulteriori informazioni, consulta Euristica con scalabilità automatica dei flussi di dati.

Aggiorna il suggerimento sull'utilizzo

Per aggiornare il suggerimento sull'utilizzo mentre un job è in esecuzione, esegui un aggiornamento in corso come segue:

gcloud

Utilizza il comando gcloud dataflow jobs update-options:

gcloud dataflow jobs update-options \
  --region=REGION \
  -worker_utilization_hint=TARGET_UTILIZATION \
  JOB_ID

Sostituisci quanto segue:

  • REGION: l'ID regione dell'endpoint a livello di regione del job
  • JOB_ID: l'ID del job da aggiornare
  • TARGET_UTILIZATION: un valore compreso tra 0,1 e 0,9]

Per reimpostare il suggerimento di utilizzo sul valore predefinito, utilizza il seguente comando gcloud:

gcloud dataflow jobs update-options \
  --unset_worker_utilization_hint \
  --region=REGION \
  --project=PROJECT_ID \
  JOB_ID

REST

Utilizza il metodo projects.locations.jobs.update:

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.worker_utilization_hint
{
  "runtime_updatable_params": {
    "worker_utilization_hint": TARGET_UTILIZATION
  }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud del job Dataflow.
  • REGION: l'ID regione dell'endpoint a livello di regione del job.
  • JOB_ID: l'ID del job da aggiornare.
  • TARGET_UTILIZATION: un valore compreso tra 0,1 e 0,9]

Euristica a scalabilità automatica dei flussi di dati

Per le pipeline in modalità flusso, l'obiettivo della scalabilità automatica orizzontale è ridurre al minimo il backlog, massimizzando al contempo l'utilizzo e la velocità effettiva dei worker, nonché reagire rapidamente ai picchi di carico.

Dataflow prende in considerazione diversi fattori per la scalabilità automatica, tra cui:

  • Backlog. Il tempo di backlog stimato viene calcolato in base alla velocità effettiva e ai byte di backlog ancora da elaborare dall'origine di input. Una pipeline è considerata backlog quando il tempo di backlog stimato rimane superiore a 15 secondi.

  • Utilizzo CPU target. Il target predefinito per l'utilizzo medio della CPU è 0,8. Puoi eseguire l'override di questo valore.

  • Chiavi disponibili. Le chiavi sono l'unità fondamentale di parallelismo in Dataflow.

In alcuni casi, Dataflow utilizza i seguenti fattori nelle decisioni di scalabilità automatica. Se questi fattori vengono utilizzati per il job, puoi visualizzare queste informazioni nella scheda delle metriche Scalabilità automatica.

  • La limitazione basata su chiave utilizza il numero di chiavi di elaborazione ricevute dal job per calcolare il limite per i worker degli utenti, poiché ogni chiave può essere elaborata da un solo worker alla volta.

  • Smorzamento del downscale. Se Dataflow rileva che sono state prese decisioni di scalabilità automatica instabili, rallenta la frequenza di downscaling per migliorare la stabilità.

  • L'upscaling basato su CPU utilizza un elevato utilizzo della CPU come criterio di aumento delle dimensioni.

  • Per i job di flussi di dati che non utilizzano Streaming Engine, la scalabilità potrebbe essere vincolata dal numero di dischi permanenti. Per maggiori informazioni, consulta Impostare l'intervallo di scalabilità automatica.

Upscaling. Se una pipeline in modalità flusso rimane in backlog con sufficiente parallelismo sui worker per diversi minuti, Dataflow fa lo scale up. Dataflow tenta di cancellare il backlog entro circa 150 secondi dallo scale up, data la velocità effettiva attuale per worker. Se è presente un backlog, ma il worker non ha un parallelismo sufficiente per i worker aggiuntivi, la pipeline non fa lo scale up. (la scalabilità del numero di worker oltre il numero di chiavi disponibili per l'elaborazione parallela non aiuta a elaborare più rapidamente il backlog.)

Ridimensionamento Quando il gestore della scalabilità automatica prende una decisione di downscaling, il backlog è il fattore di priorità più alto. Il gestore della scalabilità automatica punta a un backlog non superiore a 15 secondi. Se il backlog scende al di sotto dei 10 secondi e l'utilizzo medio dei worker è inferiore al target di utilizzo della CPU, Dataflow fa lo scale down. Finché il backlog è accettabile, il gestore della scalabilità automatica cerca di mantenere l'utilizzo della CPU vicino all'utilizzo target della CPU. Tuttavia, se l'utilizzo è già sufficientemente vicino al target, il gestore della scalabilità automatica potrebbe mantenere invariato il numero di worker, poiché ogni passaggio di downscaling ha un costo.

Streaming Engine utilizza anche una tecnica di scalabilità automatica predittiva basata sul backlog del timer. I dati non limitati in una pipeline in modalità flusso sono divisi in finestre raggruppati per timestamp. Al termine di una finestra, i timer si attivano per ogni chiave elaborata al suo interno. L'attivazione di un timer indica che la finestra è scaduta per una determinata chiave. Streaming Engine può misurare il backlog del timer e prevedere quanti timer verranno attivati alla fine di una finestra. Utilizzando il backlog del timer come indicatore, Dataflow può stimare la quantità di elaborazione che deve verificarsi quando vengono attivati i timer futuri. In base al carico futuro stimato, Dataflow scala automaticamente in anticipo per soddisfare la domanda prevista.

Metriche

Per trovare i limiti di scalabilità automatica attuali per un job, esegui una query sulle seguenti metriche:

  • job/max_worker_instances_limit: numero massimo di worker.
  • job/min_worker_instances_limit: numero minimo di worker.

Per ottenere informazioni sull'utilizzo dei worker, esegui una query sulle seguenti metriche:

  • job/aggregated_worker_utilization: l'utilizzo aggregato dei worker.
  • job/worker_utilization_hint: il suggerimento attuale per l'utilizzo dei worker.

Per ottenere insight sul comportamento del gestore della scalabilità automatica, esegui una query sulla seguente metrica:

  • job.worker_utilization_hint_is_actively_used: indica se il gestore della scalabilità automatica sta utilizzando attivamente il suggerimento di utilizzo dei worker. Se altri fattori sostituiscono il suggerimento quando questa metrica viene campionata, il valore è false.
  • job/horizontal_worker_scaling: descrive le decisioni prese dal gestore della scalabilità automatica. Questa metrica contiene le seguenti etichette:
    • direction: specifica se il gestore della scalabilità automatica ha fatto lo scale up, lo scale down o non ha eseguito alcuna azione.
    • rationale: specifica il motivo della decisione del gestore della scalabilità automatica.

Per ulteriori informazioni, consulta la pagina relativa alle metriche di Cloud Monitoring. Queste metriche vengono visualizzate anche nei grafici di monitoraggio con scalabilità automatica.

Passaggi successivi