Get Even More Visitors To Your Blog, Upgrade To A Business Listing >>

Integrating Airflow with Databricks: Creating Custom Operators

Sign upSign InSign upSign InPhil DakinFollowITNEXT--ListenShareApache Airflow provides robust capabilities for designing and managing workflows. However, there are times when external integrations require a more tailored approach than what’s available out-of-the-box. This article focuses on the practical implementation of custom Airflow operators, using Databricks integration as a case study. We’ll create a custom operator, and make it deferrable for better resource utilization.To follow this example, you will need:The best practice for interacting with an external service using Airflow is the Hook abstraction. Hooks provide a unified interface for acquiring connections, and integrate with the built-in connection management. We’ll create a hook for connecting to Databricks via the Databricks Python SDK:Operators are the tasks we run in our workflows. Let’s use our hook to access the Databricks API, submit a blocking request to run a job, and determine the state of our task based on the returned code. If our task has failed, we can indicate an operator failure to the framework by raising an AirflowException:Our analytics workload on Databricks could potentially take a long time to execute. In our previous implementation, we’d block the Airflow worker while our task waited for run_now_and_wait to return. To alleviate this, we can use the Airflow concept of a “deferrable” operator. These operators can pause their work and free up worker resources until an external event occurs, at which point they resume.The code below introduces two main components:This approach can save resources in Airflow since the operator isn’t constantly running. Instead, it’s waiting for a signal to continue:While running this operator, you may notice logs like:This is due to the get_run function not being async, but being invoked from our async trigger run method. In a production setting, it would be best practice to have the entirety of this function be asynchronous. For the sake of this article, we will use the Databricks SDK as is. If you’re curious, you can see an asynchronous Databricks API client in the official Airflow integration.The above operators work in the happy path. But, what happens when a user issues a termination from the Airflow UI? The BaseOperator provides an overridable method on_kill for these cases. When integrating with Databricks, we’ll need to pass this kill signal through to cancel our job.First, set up the connection credentials in your environment:Note that in production, a secrets manager should be used for the credentials. Now, we can define a DAG for testing our operators:Bada-bing, bada-boom. Now you’re cooking with gas!There are a variety of things left unhandled in our basic implementation:If you’re curious about these things, I recommend taking a look at the Airflow repo’s Databricks integration.----ITNEXTData Engineering, Data Science, Programming, Startups | Formerly at Citadel Securities, OcientPhil DakininDev Genius--Jacob FerusinITNEXT--8Martin HeinzinITNEXT--7Phil DakininDev Genius--Hugo LuinData Engineer Things--2Bruno Gonzalez--Ryan Chynoweth--DataFairyinTowards Data Engineering--BulletByte--Analytics at Meta--13HelpStatusAboutCareersBlogPrivacyTermsText to speechTeams



This post first appeared on VedVyas Articles, please read the originial post: here

Share the post

Integrating Airflow with Databricks: Creating Custom Operators

×

Subscribe to Vedvyas Articles

Get updates delivered right to your inbox!

Thank you for your subscription

×