Issue
This Content is from Stack Overflow. Question asked by user13274071
I am trying to send multiple files using airflow email operator. the files list will be generated dynamically and using XCom pull to get the list of files from the previous task. For some reason, email operator files parameter is NOT able to read the files list.
Here is the dag error details
<!-- begin snippet: js hide: false console: true babel: false -->
from airflow import DAG
from airflow.operators.email import EmailOperator
from airflow.operators.python import PythonOperator
import os
from datetime import datetime, timedelta
default_args = {
"owner": 'TEST',
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
}
with DAG(
dag_id="test9_email_operator_dag",
default_args=default_args,
start_date=datetime(2022, 9, 14),
end_date=datetime(2022, 9, 15),
catchup=True,
max_active_runs=1,
schedule_interval="0 12 * * *", # Runs every day @ 8AM EST
) as dag:
def print_local_folder_files(local_temp_folder):
print("local folder files => ", os.listdir(local_temp_folder))
files_list = []
for file in os.listdir(local_temp_folder):
files_list.append(local_temp_folder + file)
print("files_list => ", files_list)
return files_list
print_local_folder_files = PythonOperator(
task_id='print_local_folder_files',
python_callable=print_local_folder_files,
op_kwargs={'local_temp_folder': "/usr/local/airflow/dags/temp_dir/"},
do_xcom_push=True,
provide_context=True,
dag=dag)
send_email = EmailOperator(
task_id='send_email',
to='test@gmail.com',
subject='Test Email op Notification',
html_content='Test email op notification email. ',
files="{{ task_instance.xcom_pull(task_ids='print_local_folder_files') }}"
)
print_local_folder_files >> send_email
Solution
You pushed a list to Xcom but Xcoms are rendered as string by default so what you have there is a string representation of list. This is why when you try to read it you get the first char because when you iterate over a string you get it char by char.
To solve your issue you should set render_template_as_native_obj=True
on the DAG object:
with DAG(
dag_id="test9_email_operator_dag",
...,
render_template_as_native_obj=True,
) as dag:
This will let Jinja engine know that you expect to render as native Python types so you will get a list rather than a string.
For more information check Airflow docs on this feature.
This Question was asked in StackOverflow by user13274071 and Answered by Elad Kalif It is licensed under the terms of CC BY-SA 2.5. - CC BY-SA 3.0. - CC BY-SA 4.0.