Streamlining Apache HOP Workflow Management with Apache Airflow

Introduction

In our previous blog, we talked about the Apache HOP in more detail. In case you have missed it, refer to it here https://analytics.axxonet.com/comparison-of-and-migrating-from-pdi-kettle-to-apache-hop/” page. As a continuation of the Apache HOP article series, here we touch upon how to integrate Apache Airflow and Apache HOP. In the fast-paced world of data engineering and data science, efficiently managing complex workflows is crucial. Apache Airflow, an open-source platform for programmatically authoring, scheduling, and monitoring workflows, has become a cornerstone in many data teams’ toolkits. This blog post explores what Apache Airflow is, its key features, and how you can leverage it to streamline and manage your Apache HOP workflows and pipelines.

Apache HOP

Apache HOP is an open-source data integration and orchestration platform. For more details refer to our previous blog here.

Apache Airflow

Apache Airflow is an open-source workflow orchestration tool originally developed by Airbnb. It allows you to define workflows as code, providing a dynamic, extensible platform to manage your data pipelines. Airflow’s rich features enable you to automate and monitor workflows efficiently, ensuring that data moves seamlessly through various processes and systems.

Use Cases:

  1. Data Pipelines: Orchestrating ETL jobs to extract data from sources, transform it, and load it into a data warehouse.
  2. Machine Learning Pipelines: Scheduling ML model training, batch processing, and deployment workflows.
  3. Task Automation: Running repetitive tasks, like backups or sending reports.

DAG (Directed Acyclic Graph):

  1. A DAG represents the workflow in Airflow. It defines a collection of tasks and their dependencies, ensuring that tasks are executed in the correct order.
  2. DAGs are written in Python and allow you to define the tasks and how they depend on each other.

Operators:

  • Operators define a single task in a DAG. There are several built-in operators, such as:
    1. BashOperator: Runs a bash command.
    2. PythonOperator: Runs Python code.
    3. SqlOperator: Executes SQL commands.
    4. HttpOperator: Makes HTTP requests.

Custom operators can also be created to meet specific needs.

Tasks:

  1. Tasks are the building blocks of a DAG. Each node in a DAG is a task that does a specific unit of work, such as executing a script or calling an API.
  2. Tasks are defined by operators and their dependencies are controlled by the DAG.

Schedulers:

  1. The scheduler is responsible for triggering tasks at the appropriate time, based on the schedule_interval defined in the DAG.
  2. It continuously monitors all DAGs and determines when to run the next task.

Executors:

  • The executor is the mechanism that runs the tasks. Airflow supports different types of executors:
    1. SequentialExecutor: Executes tasks one by one.
    2. LocalExecutor: Runs tasks in parallel on the local machine.
    3. CeleryExecutor: Distributes tasks across multiple worker machines.
    4. KubernetesExecutor: Runs tasks in a Kubernetes cluster.

Web UI:

  1. Airflow has a web-based UI that lets you monitor the status of DAGs, view logs, and check the status of each task in a DAG.
  2. It also provides tools to trigger, pause, or retry DAGs.

Key Features of Apache Airflow

Workflow as Code

Airflow uses Directed Acyclic Graphs (DAGs) to represent workflows. These DAGs are written in Python, allowing you to leverage the full power of a programming language to define complex workflows. This approach, known as “workflow as code,” promotes reusability, version control, and collaboration.

Dynamic Task Scheduling

Airflow’s scheduling capabilities are highly flexible. You can schedule tasks to run at specific intervals, handle dependencies, and manage task retries in case of failures. The scheduler executes tasks in a defined order, ensuring that dependencies are respected and workflows run smoothly.

Extensible Architecture

Airflow’s architecture is modular and extensible. It supports a wide range of operators (pre-defined tasks), sensors (waiting for external conditions), and hooks (interfacing with external systems). This extensibility allows you to integrate with virtually any system, including databases, cloud services, and APIs.

Robust Monitoring and Logging

Airflow provides comprehensive monitoring and logging capabilities. The web-based user interface (UI) offers real-time visibility into the status of your workflows, enabling you to monitor task progress, view logs, and troubleshoot issues. Additionally, Airflow can send alerts and notifications based on task outcomes.

Scalability and Reliability

Designed to scale, Airflow can handle workflows of any size. It supports distributed execution, allowing you to run tasks on multiple workers across different nodes. This scalability ensures that Airflow can grow with your organization’s needs, maintaining reliability even as workflows become more complex.

Getting Started with Apache Airflow

Installation using PIP

Setting up Apache Airflow is straightforward. You can install it using pip, Docker, or by deploying it on a cloud service. Here’s a brief overview of the installation process using pip:

1. Create a Virtual Environment (optional but recommended):

           python3 -m venv airflow_env

            source airflow_env/bin/activate

2. Install Apache Airflow: 

           pip install apache-airflow

3. Initialize the Database:

           airflow db init

4. Create a User:

           airflow users create –username admin –password admin –firstname Adminlastname User –role Admin –email [email protected]

5. Start the Web Server and Scheduler:

           airflow webserver –port 8080

           airflow scheduler

6. Access the Airflow UI: Open your web browser and go to http://localhost:8080.

Installation using Docker

Pull the docker image and run the container to access the Airflow web UI. Refer to the link for more details.

Creating Your First DAG

Airflow DAG Structure:

A DAG in Airflow is composed of three main parts:

  1. Imports: Necessary packages and operators.
  2. Default Arguments: Arguments that apply to all tasks within the DAG (such as retries, owner, start date).
  3. Task Definition: Define tasks using operators, and specify dependencies between them.

Scheduling:

Airflow allows you to define the schedule of a DAG using schedule_interval:

  1. @daily: Run once a day at midnight.
  2. @hourly: Run once every hour.
  3. @weekly: Run once a week at midnight on Sunday.
  4. Cron expressions, like “0 12 * * *”, are also supported for more specific scheduling needs.
  5. Define the DAG: Create a Python file (e.g., run_lms_transaction.py) in the dags folder of your Airflow installation directory.

    Example

from airflow import DAG

from airflow.operators.dummy import DummyOperator

from datetime import datetime

default_args = {

 ‘owner’: ‘airflow’,

 ‘start_date’: datetime(2023, 1, 1),

 ‘retries’: 1,

}

dag = DAG(‘example_dag’, default_args=default_args, schedule_interval=‘@daily’)

start = DummyOperator(task_id=‘start’, dag=dag)

end = DummyOperator(task_id=‘end’, dag=dag)

start >> end

2. Deploy the DAG: Save the file in the Dags folder. Place the DAG Python script in the DAGs folder (~/airflow/dags by default). Airflow will automatically detect and load the DAG.

3. Monitor the DAG: Access the Airflow UI, where you can view and manage the newly created DAG. Trigger the DAG manually or wait for it to run according to the defined schedule.

Calling the Apache HOP Pipelines/Workflows from Apache Airflow

In this example, we walk through how to integrate the Apache HOP with Apache Airflow. Here both the Apache Airflow and Apache HOP are running on two different independent docker containers. Apache HOP ETL Pipelines / Workflows are configured with a persistent volume storage strategy so that the DAG code can request execution from Airflow.  

Steps

  1. Define the DAG: Create a Python file (e.g., Stg_User_Details.py) in the dags folder of your Airflow installation directory.

from datetime import datetime, timedelta

from airflow import DAG

from airflow.operators.bash_operator import BashOperator

from airflow.operators.docker_operator import DockerOperator

from airflow.operators.python_operator import BranchPythonOperator

from airflow.operators.dummy_operator import DummyOperator

from docker.types import Mount

default_args = {

‘owner’                 : ‘airflow’,

‘description’           : ‘Stg_User_details’,

‘depend_on_past’        : False,

‘start_date’            : datetime(2022, 1, 1),

’email_on_failure’      : False,

’email_on_retry’        : False,

‘retries’               : 1,

‘retry_delay’           : timedelta(minutes=5)

}

with DAG(‘Stg_User_details’, default_args=default_args, schedule_interval=‘0 10 * * *’, catchup=False, is_paused_upon_creation=False) as dag:

    start_dag = DummyOperator(

        task_id=‘start_dag’

        )

    end_dag = DummyOperator(

        task_id=‘end_dag’

        )

    hop = DockerOperator(

        task_id=‘Stg_User_details’,

        # use the Apache Hop Docker image. Add your tags here in the default apache/hop: syntax

        image=‘test’,

        api_version=‘auto’,

        auto_remove=True,

        environment= {

            ‘HOP_RUN_PARAMETERS’: ‘INPUT_DIR=’,

            ‘HOP_LOG_LEVEL’: ‘TRACE’,

            ‘HOP_FILE_PATH’: ‘/opt/hop/config/projects/default/stg_user_details_test.hpl’,

            ‘HOP_PROJECT_DIRECTORY’: ‘/opt/hop/config/projects/’,

            ‘HOP_PROJECT_NAME’: ‘ISON_Project’,

            ‘HOP_ENVIRONMENT_NAME’: ‘ISON_Env’,

            ‘HOP_ENVIRONMENT_CONFIG_FILE_NAME_PATHS’: ‘/opt/hop/config/projects/default/project-config.json’,

            ‘HOP_RUN_CONFIG’: ‘local’,

        },

        docker_url=“unix://var/run/docker.sock”,

        network_mode=“bridge”,

        force_pull=False,

        mount_tmp_dir=False

        )

    start_dag >> hop >> end_dag

Note: For reference purposes only.

2. Deploy the DAG: Save the file in the dags folder. Airflow will automatically detect and load the DAG.

After successful deployment, we should see the new “Stg_User_Details” DAG listed in the Active Tab and All Tab from the Airflow Portal. As shown in the screenshot above.

3. Run the DAG: We can trigger pipelines or workflows using Airflow by clicking on the Trigger DAG option as shown below from the Airflow application.

4. Monitor the DAG: Access the Airflow UI, where you can view and manage the newly created DAG. Trigger the DAG manually or wait for it to run according to the defined schedule.

After successful execution, we should see the status message as shown an execution history along with log details. new “Stg_User_Details” DAG listed in the Active Tab and All Tab from the Airflow Portal. As shown in the screenshot above.

Managing and Scaling Workflows

  1. Use Operators and Sensors: Leverage Airflow’s extensive library of operators and sensors to create tasks that interact with various systems and handle complex logic.
  2. Implement Task Dependencies: Define task dependencies using the >> and << operators to ensure tasks run in the correct order.
  3. Optimize Performance: Monitor task performance through the Airflow UI and logs. Adjust task configurations and parallelism settings to optimize workflow execution.
  4. Scale Out: Configure Airflow to run in a distributed mode by adding more worker nodes, ensuring that the system can handle increasing workload efficiently.

Conclusion

Apache Airflow is a powerful and versatile tool for managing workflows and automating complex data pipelines. Its “workflow as code” approach, coupled with robust scheduling, monitoring, and scalability features, makes it an essential tool for data engineers and data scientists. By adopting Airflow, you can streamline your workflow management, improve collaboration, and ensure that your data processes are efficient and reliable. Explore Apache Airflow today and discover how it can transform your data engineering workflows.

Streamline your Apache HOP Workflow Management With Apache Airflow through our team of experts.

Upcoming Apache HOP Blog Series

Stay tuned for the upcoming Apache HOP Blog Series:

  1. Migrating from Pentaho ETL to Apache Hop
  2. Integrating Apache Hop with an Apache Superset
  3. Comparison of Pentaho ETL and Apache Hop
Table of Contents
Table of Contents
Related Posts
Unlocking Data Insights with Apache Superset

Introduction In today’s data-driven world, having the right tools to analyze and visualize data is crucial for making informed decisions. Organizations rely heavily on actionable

Shopping Basket

Fill Your Requirements


MicroFocus Vertica Analytics Platform delivers speed, scalability, and built-in machine learning that today’s most analytically intensive workloads demand, whether in the Public Clouds, On-Premises, on Hadoop, or any Hybrid combination. Vertica’s SQL Data Warehouse is trusted by the world’s leading data-driven companies, including Cerner, Etsy, Intuit, Uber and more to deliver speed, scale and reliability on mission-critical analytics. Vertica combines the power of a high-performance, massively parallel processing SQL query engine with advanced analytics and machine learning so you can unlock the true potential of your data with no limits and no compromises. We are a certified System Integration and reseller partner of Vertica and have a strategic alliance to develop industry-specific solutions using this Award-winning Columnar Database in the APAC region.

We have extensive experience with the entire product suite having successfully completed over 50 implementations in the USA/Europe/Asia Pacific across different industries and still continue to support a few key customers Globally.

As a Future-ready and complete, enterprise-grade analytics platform, Pyramid is a compelling option for organizations. Pyramid offers an integrated suite for modern Analytics and Business Intelligence requirements. It has a broad range of analytical capabilities, including data wrangling, ad hoc analysis, interactive visualization, analytic dashboards, mobile capabilities and collaboration in a governed infrastructure. It also features an integrated workflow for system-of-record reporting. Its Augmented features such as Smart Discovery, Smart Reporting, Ask Pyramid (NLQ), AI-driven modelling, automatic visualizations and dynamic content offer powerful insights to all users, regardless of skill level and the adaptive augmented analytics platform covers the entire data life cycle out-of-the-box, from ML-based data preparation to automated insights and automated ML model building. Pyramid is especially useful for the customer who is in urgent need to get more value out of their existing SAP BW and SAP HANA investments. Without any data extraction or duplication, Pyramid offers best-in-class functionality and performance that preserves the security and governance inherent in the SAP platform. We are a Strategic System Integration and Reseller partner of Pyramid Analytics.