09-29-2023 01:49 PM
The Fivetran REST API allows you to perform a majority of Fivetran actions programmatically, which makes it possible to integrate Fivetran into existing workflows such as those commonly found in Continuous Integration / Continuous Deployment (CI/CD) processes. For example, setting up a couple of connectors in the Fivetran UI is easy, but when 100s or even 1000s of connectors need to be created, the REST API makes it easy to programmatically deploy, configure, and maintain a multitude of connectors. In this article, we will dive into and create an end-to-end pipeline using code samples made available by Fivetran.
Automation of:
You’re an organization looking to adopt Fivetran to centralize data for all your departments in a particular data warehouse. Each department has a dedicated database with similarly structured schemas and tables.
For a single database, you’re able to set up a connector in the Fivetran UI; however, you soon realize you’re going to have to do this multiple times with small changes to the configuration. You need to plan for the possibility of adding more departments, or changing data warehouses as your organization matures.
To achieve this and ensure scalability, you decide to automate resource deployment, management, and pipeline execution. You envision an end-to-end process that would eliminate manual work, and achieve the following:
Any implementation of the Fivetran REST API is going to begin with authentication using an API Key and API Secret Key. These keys are tied to the user that generates them, and the actions that can be performed using these keys are also tied to that user's Fivetran permissions. These keys can be generated in the API Keys section of your user menu in the Fivetran UI.
With these keys we can now begin leveraging the API Framework. While the API Framework referenced above is implemented in Python, the Fivetran REST API can be utilized in a multitude of different ways, such as:
Interactions with any of the tools will require those API Keys. For now, we’re going to focus on Python using the API Framework. To start,
import requests
from requests.auth import HTTPBasicAuth
import json
import colorama
from colorama import Fore, Back, Style
import time
#configuration file
r = '/config.json'
with open(r, "r") as i:
l = i.read()
y = json.loads(l)
api_key = y['API_KEY']
api_secret = y['API_SECRET']
a = HTTPBasicAuth(api_key, api_secret)
#Create a new group, destination, webhook, connectors, and execute a transformation.
def atlas(method, endpoint, payload):
base_url = 'https://api.fivetran.com/v1'
url = f'{base_url}/{endpoint}'
try:
if method == 'GET':
response = requests.get(url,auth=a)
elif method == 'POST':
response = requests.post(url, json=payload, auth=a)
elif method == 'PATCH':
response = requests.patch(url,json=payload, auth=a)
elif method == 'DELETE':
response = requests.delete(url, auth=a)
else:
raise ValueError('Invalid request method.')
response.raise_for_status() # Raise exception for 4xx or 5xx responses
return response.json()
except requests.exceptions.RequestException as e:
print(f'Request failed: {e}')
return None
Once we’ve authenticated, we can now start to make the API calls to achieve the tasks we defined above. Using the sample code below we will break up our task into parts:
#Submit group
gresp = atlas(method, gendpoint, gpayload)
if gresp is not None:
print(Fore.CYAN + 'Call: ' + method + ' ' + gendpoint + ' ' + str(gpayload))
print(Fore.GREEN + 'Response: ' + gresp['code'])
print(Fore.MAGENTA + str(gresp))
payload = {
"group_id": gresp['data']['id'],
"service": "big_query",
"region": "US",
"time_zone_offset": "-5",
"config" :
{
"project_id": "",
"data_set_location": "US"
}
}
#Submit destination
dresponse = atlas(method, endpoint, payload)
We’ve gone with a BigQuery destination, but the same can be implemented for any other supported Fivetran Destination.
The Webhook will ensure that we capture the events from our automation scripts, which in turn allows for external orchestration of other tools.
A successful run of ngrok should look something like this. The forwarding URL should be added to the webhooks section of the config.json file
As we submit more API calls we can see webhooks coming in on our Flask server. Again, we’re just using this for development and testing purposes. These webhooks would eventually point to a permanent storage location such as SFTP or Cloud Storage solution, or they’re processed by another application that acts on the types of events received, like firing off a Slack message if a connector fails.
We create the webhook using a POST:
if dresponse is not None:
print(Fore.CYAN + 'Call: ' + method + ' ' + endpoint + ' ' + str(payload))
print(Fore.GREEN + 'Response: ' + dresponse['code'])
print(Fore.MAGENTA + str(dresponse))
#New Webhook from response data
wgid = gresp['data']['id']
wmethod = 'POST'
wendpoint = 'webhooks/group/' + wgid
wpayload = {
"url": "https.ngrok-free.app",
"events": [ "connection_successful",
"connection_failure",
"create_connector",
"pause_connector",
"resume_connector",
"edit_connector",
"delete_connector",
"force_update_connector",
"resync_connector",
"resync_table"
]
}
#Submit Webhook
print(Fore.CYAN + "Submitting Webhook")
wresponse = atlas(wmethod, wendpoint, wpayload)
Next, we will add some logic to create and configure ‘n’ number of connectors in the new destination. To do so, we will take code from create_connector.py and api_interact_schema_edit.py
#Create Connectors
p = y['T'] #source auth
new_schema = ["t_400", "t_401","t_402"] #connector names
smethod = 'POST'
sendpoint = 'connectors/'
for new_schema in new_schema:
spayload = {
"service": "sql_server_rds",
"group_id": wgid,
"trust_certificates": "true",
"run_setup_tests": "true",
"paused": "false",
"pause_after_trial": "true",
"config": { "schema_prefix": new_schema,
"host": "",
"port": 1433,
"database": "sqlserver",
"user": "fivetran",
"password": p
}}
#Submit Connectors
print(Fore.CYAN + "Submitting Connector")
cresponse = atlas(smethod, sendpoint, spayload)
If you notice we are looping over the databases we’ve defined in our config.json file so that each database will be associated with its own connector. The database name will be used as the connector name, and the landed schemas will all have the database name as a prefix. In the end we end up with an `hr` schema for each database in our data warehouse.
If we need to configure the connectors with a particular set of schemas, tables, or columns we can add some logic like this:
#Configure the Schemas
#PATCH https://api.fivetran.com/v1/connectors/{connector_id}/schemas/{schema}
sgroup_id = wgid
ssmethod = 'PATCH'
ssendpoint = 'connectors/' + cresponse['data']['id'] + '/schemas/hr'
sspayload = {
"enabled": True,
"tables": {
"employees": {
"enabled": True
},
"events": {
"enabled": False
}
}
}
sresponse = atlas(ssmethod, ssendpoint, sspayload)
For now we only care about the employees table from each database (department), so we’re going to update our connectors schemas to only include the employees table. To avoid interruptions, we first pause the connector and make the schema change. Next we’ll see how we can kick off a sync with the Fivetran API.
Now we need to add logic to sync our new connectors and fire off a dbt transformation that will perform a union of our tables spread across the multiple databases we’ve synced.
u_5 = 'https://api.fivetran.com/v1' + cresponse['data']['id'] + "/sync"
j = {"force": True} #initiate the sync
s = requests.post(u_5,auth=a,json=j)
print(Fore.GREEN + "Connector Sync Started")
Now that we’ve given the command to start our sync we can unpause our connector and it will immediately begin. When the sync completes for all of our connectors, we’ll have the data in our data warehouse and we can start to think about transformations and data modeling.
For this demonstration of our REST API, we’re going to skip directly to running our Transformation to keep things concise.
#Execute a transformation
transformation_id = ''
tmethod = 'POST'
tendpoint = 'dbt/transformations/' + transformation_id + '/run'
tpayload = ''
#Submit Transfromation
tresponse = atlas(tmethod, tendpoint, tpayload)
We can see the Transformation was successfully kicked off, and if we hop back into our UI can see it is currently running.
After a few minutes the transformation completes successfully.
If you recall this transformation needed to join the employees tables from all of our databases.
{{
config(materialized='table')
}}
{% set databases = ['t_400_hr', 't_401_hr', 't_402_hr'] %}
{% set table = 'employees' %}
{% for database in databases %}
select
*,
'{{ database }}' as schema_name
from {{ source(database, table) }}
{% if not loop.last %} union all {% endif %}
{% endfor %}
Looking at the resulting output we can see that we’ve combined the data into a single table
It’s important to note that Fivetran Transformations with dbt can be fully configured via the REST API. To start you will need to have a working dbt project hosted in git. Then follow the steps:
We can even configure these transformations to use Integrated Scheduling which means we can just rely on the connectors completing a successful sync, which then immediately kicks off the transformation.
Next time, we will cover custom transformations and data modeling using webhooks and the Fivetran Transformations for dbt Core Management API.
We have successfully created an automation script for a cloud data pipeline. With this script, we create and manage the Fivetran resources, capture events, and construct the reporting data model for our end users. We’ve done this using a single script in a Python file which can be adapted by your use case and scale. Likewise, the config file can also be adapted as needed by adding or removing parameters that can be read in and utilized by the Python script.
Fivetran_demo_pipe.py which combines all the logic to accomplish the following::
*The resources linked throughout this blog are intended to kickstart automation projects.
02-02-2024 04:21 PM
The FiveTran API uses OpenAPI! So in theory we could generate an API using OpenAPI generators. Someone should publish a package on PyPI !
😛
3 weeks ago
The Fivetran Python API framework offers a streamlined solution for integrating Fivetran data pipelines with Python applications. xfinity phone services With its easy-to-use interface and robust functionality, developers can efficiently manage data synchronization processes, automate workflows, and extract valuable insights from diverse data sources. By leveraging this powerful framework, organizations can enhance data-driven decision-making and accelerate innovation with minimal effort and maximum efficiency.