2015-10-14 9 views
8

Codice:flusso d'aria non si pianifica correttamente Python

Python versione 2.7.x e il flusso d'aria versione 1.5.1

mio script dag è questo

from airflow import DAG 
from airflow.operators import BashOperator 
from datetime import datetime, timedelta 


default_args = { 
'owner': 'Vignesh', 
'depends_on_past': False, 
'start_date': datetime(2015,10,13), 
'email': ['[email protected]'], 
'schedule_interval':timedelta(minutes=5), 
'email_on_failure': True, 
'email_on_retry': True, 
'retries': 1, 
'retry_delay': timedelta(minutes=5), 
} 
dag = DAG('testing', default_args=default_args) 
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag) 
for i in range(5): 
    t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) 
    t.set_upstream(run_this_first) 

Da che si poteva vedere che sto creando un DAG con 6 attività, la prima attività (Start1) inizia prima dopo la quale iniziano tutte le altre cinque attività

Attualmente mi hanno dato 5 minuti di ritardo tra l'inizio del DAG

Ha funzionato perfettamente per tutti i sei compiti del primo tipo, ma dopo cinque minuti il ​​DAG non è ri-avviato

E 'stato più di 1 Ora il DAG non è ancora riavviato. Davvero non so se mi sbaglio.

Sarebbe davvero bello se qualcuno potesse indicarmi cosa c'è che non va. Ho provato a pulire usando airflow testing clear poi alla stessa cosa. Ha funzionato in prima istanza e poi si è fermato lì.

L'unica cosa che la linea di comando mostra è Getting all instance for DAG testing

Quando cambio della posizione del schedule_interval appena eseguito con qualsiasi orario intervallo parallel.That è con in 5 minuti 300 o più esempio attività è stata completata. Non v'è alcun 5 minuti intervallo di pianificazione

Codice 2:

from airflow import DAG 
from airflow.operators import BashOperator 
from datetime import datetime, timedelta 


default_args = { 
'owner': 'Vignesh', 
'depends_on_past': False, 
'start_date': datetime(2015,10,13), 
'email': ['[email protected]'], 
'email_on_failure': True, 
'email_on_retry': True, 
'retries': 1, 
'retry_delay': timedelta(minutes=5), 
} 
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here 
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag) 
for i in range(5): 
    t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) 
    t.set_upstream(run_this_first) 

Grazie Vignesh,

risposta

4

Per il codice 2, credo che il motivo per cui viene eseguito ogni minuto è:

  1. L'ora di inizio è 2015-10-13 00:00

  2. L'intervallo di pianificazione è a 5 minuti

  3. ogni palpito di scheduler (5 secondi per impostazione predefinita), il vostro DAG sarà controllato

    • Primo controllo: data di inizio (non ultima data di esecuzione trovati) + scheduler intervallo < ora corrente? In caso affermativo, il DAG verrà eseguito e verrà registrato il tempo di esecuzione dell'ultimo . (es .: 2015-10-13 00:00 + 5min < corrente?)
    • Secondo controllo sul prossimo battito: ultimo tempo di esecuzione + scheduler intervallo < ora corrente? In tal caso, il DAG verrà eseguito nuovamente.
    • ....

La soluzione è impostare la start_date DAG come datetime.now() - schedule_interval.

E anche se si desidera eseguire il debug:

  1. Impostazione del LoggingLevel a debug in settings.py

  2. Modifica metodo di classe is_queueable() di airflow.models.TaskInstance per

:

def is_queueable(self, flag_upstream_failed=False): 
    logging.debug('Checking whether task instance is queueable or not!') 
    if self.execution_date > datetime.now() - self.task.schedule_interval: 
     logging.debug('Too early to execute: execution_date {0} + task.schedule_interval {1} > datetime.now() {2}'.format(self.execution_date, self.task.schedule_interval, datetime.now())) 
     return False 
     ... 
+0

quindi stai dicendo che verrà eseguito ogni cinque secondi finché la data di esecuzione non assorbe la data corrente ti dopo di che seguirà l'intervallo di tempo pianificato – The6thSense

+0

Sì, è quello che intendo. – Yongyiw

+0

grazie amico, ma ho due dubbi. Come posso programmare un compito a partire da questo secondo con un intervallo di programmazione di un'ora. Posso programmare un lavoro per il futuro – The6thSense

3

Poiché l'ora di avvio (2015-10-13 00:00) è inferiore a ora, attiva il flusso d'aria backfill. Funzionerà dal 2015-10-13 alle 00:00 quando ogni secondi viene rilevato lo scheduler del flusso d'aria (è la data di inizio), ma la data di esecuzione è compresa tra 5 minuti (intervallo di attività).

vedere il nome di registro:

$tree airflow/logs/testing/ 
testing/ 
|-- Orders10 
| |-- 2015-10-13T00:00:00 
| |-- 2015-10-13T00:05:00 
| -- 2015-10-13T00:10:00 
|-- Orders11 
| |-- 2015-10-13T00:00:00 
| |-- 2015-10-13T00:05:00 
| -- 2015-10-13T00:10:00 
|-- Orders12 
| |-- 2015-10-13T00:00:00 
| |-- 2015-10-13T00:05:00 
| -- 2015-10-13T00:10:00 
|-- Orders13 
| |-- 2015-10-13T00:00:00 
| |-- 2015-10-13T00:05:00 
| -- 2015-10-13T00:10:00 
|-- Orders14 
| |-- 2015-10-13T00:00:00 
| |-- 2015-10-13T00:05:00 
| -- 2015-10-13T00:10:00 
-- Start1 
    |-- 2015-10-13T00:00:00 
    |-- 2015-10-13T00:05:00 
    |-- 2015-10-13T00:10:00 
    -- 2015-10-13T00:15:00 

vedere il tempo creare dei log:

$ll airflow/logs/testing/Start1 
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:00:00 
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:05:00 
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:51 2015-10-13T00:10:00 
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:52 2015-10-13T00:15:00 

Inoltre, è possibile vedere le istanze di attività sul web UI:

air flow Task Instances

+0

sì hai ragione – The6thSense

Problemi correlati