cancel
Showing results for 
Search instead for 
Did you mean: 

Fivetran Python API Framework

elijahdavis
Fivetranner
Fivetranner

Authors: Elijah Davis & Angel Hernandez

Foreword

The Fivetran REST API allows you to perform a majority of Fivetran actions programmatically, which makes it possible to integrate Fivetran into existing workflows such as those commonly found in Continuous Integration / Continuous Deployment (CI/CD) processes. For example, setting up a couple of connectors in the Fivetran UI is easy, but when 100s or even 1000s of connectors need to be created, the REST API makes it easy to programmatically deploy, configure, and maintain a multitude of connectors. In this article, we will dive into and create an end-to-end pipeline using code samples made available by Fivetran.

Common Fivetran API Use Cases:

Automation of:

  • Pipeline creation, management, and access control.
  • Transformation layer and data model orchestration.
  • Data governance.
  • Every Fivetran resource you will use.

Scenario:

You’re an organization looking to adopt Fivetran to centralize data for all your departments in a particular data warehouse. Each department has a dedicated database with similarly structured schemas and tables. 

For a single database, you’re able to set up a connector in the Fivetran UI; however, you soon realize you’re going to have to do this multiple times with small changes to the configuration. You need to plan for the possibility of adding more departments, or changing data warehouses as your organization matures.

To achieve this and ensure scalability, you decide to automate resource deployment, management, and pipeline execution. You envision an end-to-end process that would eliminate manual work, and achieve the following:

  1. Create a new destination.
  2. Create a new webhook to monitor events.
  3. Add new connectors to the new destination.
  4. Configure the connectors schemas.
  5. Execute a sync on the new connectors.
  6. Execute a transformation to model the data.

Preparation:

  1. Review available resources:

Any implementation of the Fivetran REST API is going to begin with authentication using an API Key and API Secret Key. These keys are tied to the user that generates them, and the actions that can be performed using these keys are also tied to that user's Fivetran permissions. These keys can be generated in the API Keys section of your user menu in the Fivetran UI.

elijahdavis_0-1696014305594.png

With these keys we can now begin leveraging the API Framework. While the API Framework referenced above is implemented in Python, the Fivetran REST API can be utilized in a multitude of different ways, such as:

Interactions with any of the tools will require those API Keys. For now, we’re going to focus on Python using the API Framework. To start,

2. Review the common elements in the API Framework sample files:

    • We store sensitive data and parameters in an external file “config.json”
    • Example here
    • The first 45 lines of this code handles authentication to the Fivetran API, and we define our primary function “atlas” which delegates all requests to the Fivetran API.
import requests
from requests.auth import HTTPBasicAuth
import json
import colorama
from colorama import Fore, Back, Style
import time

#configuration file
r = '/config.json'
with open(r, "r") as i:
    l = i.read()
    y = json.loads(l)
api_key = y['API_KEY']
api_secret = y['API_SECRET']
a = HTTPBasicAuth(api_key, api_secret)

#Create a new group, destination, webhook, connectors, and execute a transformation.
def atlas(method, endpoint, payload):

    base_url = 'https://api.fivetran.com/v1'
    url = f'{base_url}/{endpoint}'

    try:
        if method == 'GET':
            response = requests.get(url,auth=a)
        elif method == 'POST':
            response = requests.post(url, json=payload, auth=a)
        elif method == 'PATCH':
            response = requests.patch(url,json=payload, auth=a)
        elif method == 'DELETE':
            response = requests.delete(url, auth=a)
        else:
            raise ValueError('Invalid request method.')
        response.raise_for_status()  # Raise exception for 4xx or 5xx responses
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f'Request failed: {e}')
        return None

 

 

Once we’ve authenticated, we can now start to make the API calls to achieve the tasks we defined above. Using the sample code below we will break up our task into parts:

3. We will start by creating a new destination:

    • Then we will associate a webhook with that new destination. If you already have a destination in place we can skip the group and destination creation step and move on to creating the Webhook.
#Submit group
gresp = atlas(method, gendpoint, gpayload)
if gresp is not None:
    print(Fore.CYAN + 'Call: ' + method + ' ' + gendpoint + ' ' + str(gpayload))
    print(Fore.GREEN +  'Response: ' + gresp['code'])
    print(Fore.MAGENTA + str(gresp))
    payload = {
      "group_id":  gresp['data']['id'],
      "service": "big_query",
      "region": "US",
      "time_zone_offset": "-5",
      "config" : 
          {
            "project_id": "",
            "data_set_location": "US"
          }
  }
    #Submit destination
    dresponse = atlas(method, endpoint, payload)

 

 

We’ve gone with a BigQuery destination, but the same can be implemented for any other supported Fivetran Destination.

Screenshot 2023-09-29 at 12.07.12 PM.png

4. Create a new webhook to monitor events:

The Webhook will ensure that we capture the events from our automation scripts, which in turn allows for external orchestration of other tools.

    • If you are testing webhooks locally, follow the steps outlined in Webhook Example  to get the url. It will be the https response from command ‘ngrok http 4242’.

A successful run of ngrok should look something like this. The forwarding URL should be added to the webhooks section of the config.json file

elijahdavis_3-1696010996192.png

As we submit more API calls we can see webhooks coming in on our Flask server. Again, we’re just using this for development and testing purposes. These webhooks would eventually point to a permanent storage location such as SFTP or Cloud Storage solution, or they’re processed by another application that acts on the types of events received, like firing off a Slack message if a connector fails.

Screenshot 2023-09-29 at 12.07.06 PM.png

We create the webhook using a POST:

    if dresponse is not None:
        print(Fore.CYAN + 'Call: ' + method + ' ' + endpoint + ' ' + str(payload))
        print(Fore.GREEN +  'Response: ' + dresponse['code'])
        print(Fore.MAGENTA + str(dresponse))
        #New Webhook from response data
        wgid = gresp['data']['id']
        wmethod = 'POST' 
        wendpoint = 'webhooks/group/' + wgid
        wpayload = { 
        "url": "https.ngrok-free.app",
        "events": [ "connection_successful",
        "connection_failure",
        "create_connector",
        "pause_connector",
        "resume_connector",
        "edit_connector",
        "delete_connector",
        "force_update_connector",
        "resync_connector",
        "resync_table"
        ]
        }
        #Submit Webhook
        print(Fore.CYAN + "Submitting Webhook") 
        wresponse = atlas(wmethod, wendpoint, wpayload)

Next, we will add some logic to create and configure ‘n’ number of connectors in the new destination. To do so, we will take code from create_connector.py and api_interact_schema_edit.py

5. To create the connectors, we will add logic like the block below:

  • The config.json file can store the connector definitions, such as but not limited to the name, schema selections, credentials, predefined payloads, and execution count. (Hint: Run for ‘n’)
  • If you are familiar with our Cloud Function connectors, use the config.json file like you would the Secrets or State objects.

 

 

        #Create Connectors
        p = y['T']                               #source auth
        new_schema = ["t_400", "t_401","t_402"]  #connector names
        smethod = 'POST'                      
        sendpoint = 'connectors/'
        for new_schema in new_schema:
            spayload = {
                        "service": "sql_server_rds",
                        "group_id": wgid,
                        "trust_certificates": "true",
                        "run_setup_tests": "true",
                        "paused": "false",
                        "pause_after_trial": "true",
                        "config": { "schema_prefix": new_schema,
                                    "host":  "",
                                    "port": 1433,
                                    "database": "sqlserver",
                                    "user": "fivetran",
                                    "password": p       
                        }}
        #Submit Connectors
            print(Fore.CYAN + "Submitting Connector") 
            cresponse = atlas(smethod, sendpoint, spayload)

If you notice we are looping over the databases we’ve defined in our config.json file so that each database will be associated with its own connector. The database name will be used as the connector name, and the landed schemas will all have the database name as a prefix. In the end we end up with an `hr` schema for each database in our data warehouse.

elijahdavis_0-1696015612562.png

6. Configure the connectors schemas:

If we need to configure the connectors with a particular set of schemas, tables, or columns we can add some logic like this:

 

 

        #Configure the Schemas 
        #PATCH https://api.fivetran.com/v1/connectors/{connector_id}/schemas/{schema}
                sgroup_id = wgid
                ssmethod = 'PATCH'
                ssendpoint = 'connectors/' + cresponse['data']['id'] + '/schemas/hr' 
                sspayload = {
                                "enabled": True,
                                "tables": {
                                    "employees": {
                                        "enabled": True
                                    },
                                    "events": {
                                        "enabled": False
                                    }
                                }
                            }                            
                sresponse = atlas(ssmethod, ssendpoint, sspayload)

For now we only care about the employees table from each database (department), so we’re going to update our connectors schemas to only include the employees table. To avoid interruptions, we first pause the connector and make the schema change. Next we’ll see how we can kick off a sync with the Fivetran API.

Screenshot 2023-09-29 at 1.25.48 PM.png

7. Execute a sync on the new connectors:

Now we need to add logic to sync our new connectors and fire off a dbt transformation that will perform a union of our tables spread across the multiple databases we’ve synced.

  • Given the need for ‘n’ number of runs, we will include this logic in the for loop we defined during connector creation. With a request like the one below we will start syncs on our new connectors.

 

u_5 = 'https://api.fivetran.com/v1' + cresponse['data']['id'] + "/sync"

j = {"force": True} #initiate the sync

s = requests.post(u_5,auth=a,json=j)

print(Fore.GREEN + "Connector Sync Started")

 

Now that we’ve given the command to start our sync we can unpause our connector and it will immediately begin. When the sync completes for all of our connectors, we’ll have the data in our data warehouse and we can start to think about transformations and data modeling.

8. Execute a transformation to model the data:

For this demonstration of our REST API, we’re going to skip directly to running our Transformation to keep things concise. 

    • To facilitate the transformation execution, we are using logic from api_interact_run_dbt_transformation.py.
    • In this scenario we already have our transformation configured in Fivetran and we can retrieve the Transformation Id from the UI.

 

#Execute a transformation

transformation_id = ''

tmethod = 'POST'

tendpoint = 'dbt/transformations/' + transformation_id + '/run'

tpayload = ''

#Submit Transfromation

tresponse = atlas(tmethod, tendpoint, tpayload)

 

We can see the Transformation was successfully kicked off, and if we hop back into our UI can see it is currently running.

Screenshot 2023-09-29 at 12.07.45 PM.png

After a few minutes the transformation completes successfully.

Screenshot 2023-09-29 at 12.07.50 PM.png

If you recall this transformation needed to join the employees tables from all of our databases. 

 

{{ 

  config(materialized='table') 

}}



{% set databases = ['t_400_hr', 't_401_hr', 't_402_hr'] %}

{% set table = 'employees' %}



{% for database in databases %}

select

  *,

  '{{ database }}' as schema_name

from {{ source(database, table) }}



{% if not loop.last %} union all {% endif %}

{% endfor %}

 

Looking at the resulting output we can see that we’ve combined the data into a single table 

Screenshot 2023-09-29 at 12.04.32 PM.png

It’s important to note that Fivetran Transformations with dbt can be fully configured via the REST API. To start you will need to have a working dbt project hosted in git. Then follow the steps:

We can even configure these transformations to use Integrated Scheduling which means we can just rely on the connectors completing a successful sync, which then immediately kicks off the transformation.

elijahdavis_0-1696010623369.png

Next time, we will cover custom transformations and data modeling using webhooks and the Fivetran Transformations for dbt Core Management API.

In Summary…

We have successfully created an automation script for a cloud data pipeline. With this script, we create and manage the Fivetran resources, capture events, and construct the reporting data model for our end users. We’ve done this using a single script in a Python file which can be adapted by your use case and scale. Likewise, the config file can also be adapted as needed by adding or removing parameters that can be read in and utilized by the Python script.

  • This script can be refactored in a variety of different ways, but the idea regardless of how the code* is implemented, is to create a single point of entry for running an automation that will reduce manual work and scale as the data and organization grows. 
  • All of the sample code shown above was used to create:

Fivetran_demo_pipe.py which combines all the logic to accomplish the following::

  • Create a new destination.
  • Create a webhook on the new destination to monitor Fivetran events.
  • Add new connectors to the new destination.
  • Configure the connectors schemas.
  • Execute a sync on the new connectors.
  • Execute a transformation to model the data.

*The resources linked throughout this blog are intended to kickstart automation projects. 

Questions? Book a Services Consultation with a Fivetran Expert

Code produced: Fivetran_demo_pipe.py

 

 

2 REPLIES 2

choochoo5trains
New Contributor II

The FiveTran API uses OpenAPI! So in theory we could generate an API using OpenAPI generators. Someone should publish a package on PyPI !

😛

davidcarlos
New Contributor

The Fivetran Python API framework offers a streamlined solution for integrating Fivetran data pipelines with Python applications. xfinity phone services With its easy-to-use interface and robust functionality, developers can efficiently manage data synchronization processes, automate workflows, and extract valuable insights from diverse data sources. By leveraging this powerful framework, organizations can enhance data-driven decision-making and accelerate innovation with minimal effort and maximum efficiency.