From the course: Apache Airflow Essential Training
Branching using operators - Apache Airflow Tutorial
From the course: Apache Airflow Essential Training
Branching using operators
- [Instructor] In this demo we'll see how we can perform branching operations using Python operators, specifically the BranchPythonOperator and then we'll rewrite this DAG to perform branching using the TaskFlow API. Now, this pipeline is going to work on some real data, so under my Airflow folder I'm going to create a new sub-folder called datasets. And in this sub-folder I'm going to be storing a CSV file that we'll be using. The dataset we'll be using for our branching operation is called car_data.csv and like the name suggests, it contains details about some cars. Let's open this up and this is what the data looks like. We have the brand, model, range, and so on. A bunch of details about cars. We'll perform a bunch of processing on this car data and I'm going to store the output here under another sub-folder called output. So under my Airflow installed directly, I'm going to have a output folder which is where our final transform result will be stored. I've stopped the Airflow scheduler and web server running because I want to install pandas in my virtual environment. Your virtualenv needs to have pandas installed in order to perform the data transformations in this demo. So make sure you install pandas and then you can restart the scheduler as well as the web server. My scheduler is up and running here, so I'm going to switch over to the tab where I had the web server running and I'm going to restart the web server as well. Now that I have my environment set up, I'm ready to run my data transformation DAG that contains a branching operator. Back to the code here where I've set up a new DAG called branching using operators. I've defined a number of Python functions for the different steps in my data transformation pipeline. I have a function read csv file which reads the card data CSV file and then returns a JSON representation of that file, df.to_json. I have the determined branch python function which accesses the transform variable to see what the final transformed output should be. If the transform variable is set to filter two seaters, we'll implement the filter two seaters task. Otherwise, if the final output is set to filter forwards we'll implement the filter forwards task, that is our if elses statement. And here I have the Python callables for the two tasks, filter two-seaters and filter forwards. Both of them taken a task instance as an input argument. Within the filter two seaters function, I use ti.xcom_pull on line 33 to read in the JSON data return by the read CSV file task we call read json, get a data frame. Filter all of the records in the data frame where seats equal to two. This is on line 37. And on lines 39 and 40, I use xcom push to push two bits of information that transform result. That is the two seater records and the transform file name. The file name, output file name should be called two seaters.csv. Filter forwards which stands for filter front wheel drive does something very similar. Xcom pull to read the JSON data. We perform the filtering on line 48, and I use xcom push to push the transform result as well as the transform file name. And then in the right CSV result function which also takes in a task instance as an input argument, we access the transform result and transform file name using xcom pull, and we write out the final result. That's the two CSV function on line 61 and 62. On lines 66 through 73, I used the with statement to set up the DAG, and I set up Python Operators for all of the Python callable. The determined branch task of course, uses the Python Branch Operator. Now you know the operator setup well. I don't need to give you an explanation here. There is one detail to note here about the write CSV result task. Notice the trigger rule specified, it's none failed. If I run this tag without specifying the trigger rule, this right task will be skipped if any of the upstream tasks are skipped. Now because we have a branching operator, there will always be one upstream task that will be skipped, either filter two seater or filter front wheel drive. That means this write task will never execute unless I have this trigger rule, none failed. That's why it's there. On lines 101 and 102, you can see how I set up the dependencies for this DAG. Now let's head over to the Airflow UI. I'll need to configure a variable before we can actually run this core. Go to admin variables, and here we'll set up the transform variable. I'm just going to open this up in a new tab, so that it's easier for us to work with. I have no variables configured at this point in time. Click on the plus button here, and this will allow me to configure a new variable. The key for the variable is transform the value is filter two-seaters, so we'll execute the branch to filter two-seaters first in our first execution of this DAG. Now let's head back to the Airflow UI, and since I've gotten rid of all of the other DAGS that I have here, we have only the branching using operators. Let's click through and here you can see the grid view of the DAG, all of the tasks here at the bottom left. Let's go to the graph view and make sure that the dependencies have been set up correctly. Here is what our DAG looks like. The transformation that we'll run will be based on the variable that we've configured. The transform will initially filter two-seaters. Go ahead and unpause this DAG, and let's run through, and you can see that the two-seater task was what was implemented and write CSV result was also implemented. I leave it you to explore the logs and the x-coms for the individual tasks. You know how to do that. Let's go directly to the output folder. Here in the output folder, you should find two seaters.csv written out there. Let's open up this file and here you can see the two records for two-seaters in our dataset. Clearly, branching with operators works just fine.
Practice while you learn with exercise files
Download the files the instructor uses to teach the course. Follow along and learn by watching, listening and practicing.
Contents
-
-
-
-
Branching using operators6m 18s
-
(Locked)
Branching using the TaskFlow API4m 45s
-
(Locked)
Complete branching pipeline using TaskFlow3m 17s
-
(Locked)
Interoperating and passing data between operators and TaskFlow4m 53s
-
(Locked)
Performing SQL operations with TaskFlow4m 53s
-
(Locked)
Data transformation and storage using TaskFlow4m 23s
-
-
-
-
-