cancel
Showing results for 
Search instead for 
Did you mean: 

Fivetran's source-side filtering: Cloud Functions revolutionize your data pipeline.

elijahdavis
Fivetranner
Fivetranner

Authors: Elijah Davis & Kirk Van Arkel

 

What is a Cloud Function Connector

In the simplest terms, a cloud function connector is customized code that Fivetran orchestrates and maintains. In the phrase “cloud function connector”, you can think of the “cloud function” portion as the customized code, while “connector” refers to Fivetran’s management of that code.

A cloud function is a serverless platform that is used to execute code, such as AWS Lambda, Google Cloud Functions, or Azure Functions. Cloud functions can be executed manually, although it is often desired to have an associated scheduler, invoker, and maintainer of state. Fivetran handles ALL OF THIS.

When to use a Cloud Function Connector?

A Fivetran Cloud Function connector can be beneficial in many different scenarios. Most commonly, we recommend building a cloud function connector when there isn’t a native Fivetran connector already available to sync data from a given source. While you can always submit a Lite Connector By Request form for us to create a connector that isn’t yet supported, the feasibility and timeline of these connectors cannot always be guaranteed. With cloud functions, the timeline is your own, and on top of that, your options for customization are seemingly endless.

Not only can cloud functions help you develop integrations for data sources that Fivetran does not yet natively support, they also allow for a high level of customization with ANY data source that can be accessed via code.

Source-Side Filtering

The primary topic of this article is a lesser-known method of using a Cloud Function connector for source-side filtering. It can be as simple as it sounds and involves extracting a filtered subset of data from the data source at hand. This strategy* can be used during the development stage for quick and highly customized testing. The idea of source-side filtering is best understood through an example walkthrough.

*This is similar to the custom_payload in AWS

Developing a Cloud Function

The first step in developing a cloud function is understanding what data you would like to integrate, and ensuring that the data can be accessed/extracted programmatically. If the data source has API functionality such as the example we will review, it is important to first understand which endpoints you want to retrieve data from. From here, it will next be important to understand what authentication or credentials are needed to access the data programmatically. Is it a username and password? Is it an API key and secret? In our example, the API is public and thus does not require authentication.

Source Data Checklist:

  • Source: London Tube API
  • Endpoint: Tube Status
  • Authentication: Open/Public (API Key, OAuth, username/password)
  • Primary Key: To be defined after data retrieval.
  • Parameters: Sensitive = utilize the secrets object. Non-Sensitive = utilize custom_payload(AWS only)

Using this example code we are going to hit the tube/status endpoint programmatically to ensure we can connect to the data source. Also, following best practices, we included error checking throughout.

 

import logging
import json
import requests

def main():

    result = requests.get(
        'https://api.tfl.gov.uk/line/mode/tube/status',
        headers={
            "content-type":"application/json", 
            "charset":"utf-8"
        }
    )

    try:
        since_id = None
        timeline = json.loads(result.text)
    except json.JSONDecodeError as e:
        logging.error(e)
        print("Error decoding JSON response")
    except requests.RequestException as e:
        logging.error(e)
        print("Error accessing the API")
    else:
        print("Success! The response is:")
        print(json.dumps(timeline, indent=4))

if __name__ == "__main__":
    main()

 

Data Response Format:

We have successfully connected to the source and retrieved an initial set of results. A few things to keep in mind:

  • Do you need all the data that is provided back in the response?
  • If not, outline elements or unique values that could be used to filter the data.
  • Decide which data is important/necessary for your use case.
  • Parse out the data points that you need and flatten the data if it’s nested JSON.
  • Follow Fivetran's best practices outlined here

Using the code below, we can build a table to hold our response data. The table is represented as a list of dictionaries composed of the JSON response data. For this example, we are using a Pandas DataFrame to validate our data structure.

 

import logging
import json
import requests

def main():

    result = requests.get(
        'https://api.tfl.gov.uk/line/mode/tube/status',
        headers={
            "content-type":"application/json", 
            "charset":"utf-8"
        }
    )

    try:
        since_id = None
        timeline = json.loads(result.text)
    except json.JSONDecodeError as e:
        logging.error(e)
        print("Error decoding JSON response")
    except requests.RequestException as e:
        logging.error(e)
        print("Error accessing the API")
    else:
        #print("Success! The response is:")
        #print(json.dumps(timeline, indent=4))

        tflLineStatus = []
        for t in timeline:
            # Remember the first id we encounter, which is the most recent
            if (since_id == None) :
                since_id = t["modified"]

            # Add all tflLineStatus
            tflLineStatus.append({
                "linename": t["id"],
                "linestatus": t["lineStatuses"][0]["statusSeverityDescription"],
                "timestamp" : t["created"]
                #,add data elements to the cloud function > test.
                ,"mode" : t["modeName"]
                ,"ls": t["lineStatuses"],
                # , example of data element NOT associated to all records. DEMO status details error.
                #"delay_reason" : t["lineStatuses"][0]["reason"]

            })

        print(json.dumps(tflLineStatus, indent=4))
        print("state: "+ since_id)
# view your list of dictionaries(records) in table format w a DataFrame
        import pandas as pd
        df = pd.DataFrame(tflLineStatus)
        print(df.head(5))

if __name__ == "__main__":
    main()

 

Data validation example:

The DataFrame allows you to check what the data will look like in table format. Utilizing the Python pandas package and DataFrames, with just 3 lines of code, you can convert your list of dictionaries into a pandas DataFrame and output the first N (in the code above, N=5) rows of your destination table. The following DataFrame output in your terminal will look like this:

Screenshot 2023-10-31 at 2.17.46 PM.png

From here, you might decide to:

  • Change the format of the timestamp column
  • Flatten the nested ls column further
  • Extract data on the bakerloo linename

Error Checking:

As we work through our data validation checks, to simulate an error, add in {"delay_reason" :t["lineStatuses"][0]["reason"]} to make the function fail. This is due to ‘reason’ not being present in all records and also not accessing it correctly. To troubleshoot this, we will parse the original response data from step 1 using the example code below.

Using the code block below, we can see that we added logic to:

  • Loop through the element t["lineStatuses"][0]["statusSeverityDescription"], to review ‘delay_reason’ data for each entry.
  • Isolate specific records based on a wildcard
  • Define a new parameter ‘s’ to store the delay reason associated with a given line.
  • Condensed our output model to show only pertinent data.

With these changes, we now have a table that will help us review line status, timing, and delay reasons.

 

import logging
import json
import requests
import fnmatch

def main():

    result = requests.get(
        'https://api.tfl.gov.uk/line/mode/tube/status',
        headers={
            "content-type":"application/json", 
            "charset":"utf-8"
        }
    )

    try:
        since_id = None
        timeline = json.loads(result.text)
    except json.JSONDecodeError as e:
        logging.error(e)
        print("Error decoding JSON response")
    except requests.RequestException as e:
        logging.error(e)
        print("Error accessing the API")
    else:
        #print("Success! The response is:")
        #print(json.dumps(timeline, indent=4))

        tflLineStatus = []
        for t in timeline:
            # Remember the first id we encounter, which is the most recent
            if (since_id == None) :
                since_id = t["modified"]

            # Define a wildcard pattern
            wildcard_pattern = "*Delay*"
            if fnmatch.fnmatch(t["lineStatuses"][0]["statusSeverityDescription"], wildcard_pattern):
               print(t["lineStatuses"][0]["statusSeverityDescription"] + ": On " + t["lineStatuses"][0]["reason"])
               s = t["lineStatuses"][0]["statusSeverityDescription"] + ": On " + t["lineStatuses"][0]["reason"]
            else:
                print(t["lineStatuses"][0]["statusSeverityDescription"])
                s = "On Time"

            # Add all tflLineStatus
            tflLineStatus.append({
                "linename": t["id"],
                "linestatus": t["lineStatuses"][0]["statusSeverityDescription"],
                "timestamp" : t["created"]
                ,"mode" : t["modeName"]
                #,"ls": t["lineStatuses"]
                ,"delay_reason": s
                # , example of data element NOT associated to all records.
               # "delay_reason" : t["lineStatuses"][0]["reason"]
            })

        print(json.dumps(tflLineStatus, indent=4))
        #print("state: "+ since_id)
        #view your list of dictionaries(records) in table format w a DataFrame
        import pandas as pd
        df = pd.DataFrame(tflLineStatus)
        print(df.head(10))

if __name__ == "__main__":
    main()

 

-Output Example when testing the response data for ["lineStatuses"][0]["statusSeverityDescription"]

elijahdavis_0-1698783683383.png

-Output example for the table layout, after including the logic for and parameter value of ‘s’.

elijahdavis_1-1698783710427.png

Source Side Filtering Summary:

With our pipe almost complete, it is time to test a few datasets and methodologies for populating the data. For this example:

  • We need to load ‘n’ records into Snowflake to test the table structure. {Secrets Object [response_num]}
  • We need to filter this dataset as needed. {Secrets Object [wildcard_pattern]}
  • We need the ability to define the table/schema dynamically. {Secrets Object [schema]}

The schema for a Cloud Function connector is defined in the JSON response that Fivetran expects back from the Cloud Function. Within the schema, we can define the tables and primary keys of those tables. Typically, the schema is hard coded within the Cloud Function. However, during development, the exact schema we want to create may not be explicitly known.

Instead of hard-coding the schema within the cloud function itself, we can make it dynamic by defining it within the secrets object of the cloud function connector in the Fivetran connector setup form. Doing this, allows us to:

  • Change table names
  • Choose which tables to migrate
  • Change the primary key of tables
  • Remove schema object entirely

Note this will result in append-only behavior from the connector.

Example of source-side filtering:

As we review London Tube API data, we notice patterns in the response data that inspire a new table[delay_timing]. To create the new table we are going to add filtering logic to the function and a parameter to our secrets object. In this new table, we are going to populate delayed status records and the associated timestamp.

*Pro Tip: Parameter ‘wildcard_pattern’ is another example of functionality that could be driven by the Secrets object. Try it out!

To test the changes we will add logic to filter our response set to the first 5 responses. We will use another parameter in our secrets object[response_num]

{
   "schema":{
      "tfLineStatus":[
         "linename",
         "timestamp"
      ],
      "delay_timing":[
         "linename",
         "timestamp",
         "delay_reason"
      ]
   },
   "response_num":5
}

This will result in a code base like the one below.

import logging
import json
import requests
import fnmatch

def main():

    result = requests.get(
        'https://api.tfl.gov.uk/line/mode/tube/status',
        headers={
            "content-type":"application/json", 
            "charset":"utf-8"
        }
    )

    try:
        since_id = None
        timeline = json.loads(result.text)
    except json.JSONDecodeError as e:
        logging.error(e)
        print("Error decoding JSON response")
    except requests.RequestException as e:
        logging.error(e)
        print("Error accessing the API")
    else:

        tflLineStatus = []
        delay_timing = []
        response_num = request["secrets"]["response_num"]
        print("Checking for the first " + str(response_num) + " responses.")
        for i, t in enumerate(timeline):
            if i >= response_num:
                break
            # Remember the first id we encounter, which is the most recent
            if (since_id == None) :
                since_id = t["modified"]

            # Define a wildcard pattern
            wildcard_pattern = "*Delay*"
            if fnmatch.fnmatch(t["lineStatuses"][0]["statusSeverityDescription"], wildcard_pattern):
               s = t["lineStatuses"][0]["statusSeverityDescription"] + ": On " + t["lineStatuses"][0]["reason"]
                        # Add all delayed data
               delay_timing.append({
                "linename": t["id"],
                "linestatus": t["lineStatuses"][0]["statusSeverityDescription"],
                "timestamp" : t["created"]
                ,"mode" : t["modeName"]
                ,"delay_reason": s

                })           
            else:
                s = "On Time"

            # Add all tflLineStatus
            tflLineStatus.append({
                "linename": t["id"],
                "linestatus": t["lineStatuses"][0]["statusSeverityDescription"],
                "timestamp" : t["created"]
                ,"mode" : t["modeName"]
                ,"delay_reason": s
            })

        #print(json.dumps(tflLineStatus, indent=4))
        #print(json.dumps(delay_timing, indent=4))
        
        #print("state: "+ since_id)
        #view your list of dictionaries(records) in table format w a DataFrame
        import pandas as pd
        df = pd.DataFrame(tflLineStatus)
        dfd = pd.DataFrame(delay_timing)
        print("Review Line Status Table")
        print(df.head(10))
        print("Review Delayed Line Table")
        print(dfd.head(10))

 # Send JSON response back to Fivetran
        if(since_id==None):
            since_id = t["modified"]
        ans = {
            # // Remember the most recent id, so our requests are incremental
            "state": {
                "since_id": since_id
            },
            "schema" : {request["secrets"]["schema"]},
            "insert": {
                "tflLineStatus": tflLineStatus,
                "delay_timing": delay_timing
            },
            "hasMore" : False
        }
        print("Fivetran Response Format")
        print(ans)

if __name__ == "__main__":
    main()

Example of Changing Primary Key:

Looking at our example with the London Tube API, let’s say we want the primary key of our table to be dynamic. Originally, we only wanted the most recent record for each line, so we defined the primary key of the table to be the linename column. Using the secrets object:

{“schema”: {“tfLineStatus”: [“linename”]}}

Later on, we decided we require a historical view of the data, so we will add in timestamp as a part of the primary key. Thus, we can define the secrets object:

{“schema”: {“tfLineStatus”: [“linename”, “timestamp”]}}

We can bring this object into our cloud function code with the following code:

schema = request["secrets"]["schema"]

Overall, this creates a secrets object like:

{
   "schema":{
      "tfLineStatus":[
         "linename",
         "timestamp"
      ],
      "delay_timing":[
         "linename",
         "timestamp",
         "delay_reason"
      ]
   },
   "response_num":5
}

With the tips outlined in this article, start leveraging Cloud Functions today and streamline the data pipelines of tomorrow! Next time we will discuss Cloud Function architecture, pagination and cursoring.

CSP setup & Fivetran setup questions? Book a call with Services

Conclusion

We have successfully demonstrated how to leverage Fivetran cloud functions to achieve source-side filtering. The function will filter responses from an API to create two tables{tflLineStatus, delay_timing} and format the output using the Fivetran response format. This process can be utilized to kick-start data projects that are tailored to Data Science, ad hoc data pulls, and non-native sources. Fivetran offers a robust offering of 350+ prebuilt connectors, however, Data Science and the solutions of tomorrow might require non-native sources today. Rest assured that Fivetran has the solution, all you need to do is orchestrate it.

Resources:

0 REPLIES 0