10-31-2023 01:54 PM
In the simplest terms, a cloud function connector is customized code that Fivetran orchestrates and maintains. In the phrase “cloud function connector”, you can think of the “cloud function” portion as the customized code, while “connector” refers to Fivetran’s management of that code.
A cloud function is a serverless platform that is used to execute code, such as AWS Lambda, Google Cloud Functions, or Azure Functions. Cloud functions can be executed manually, although it is often desired to have an associated scheduler, invoker, and maintainer of state. Fivetran handles ALL OF THIS.
A Fivetran Cloud Function connector can be beneficial in many different scenarios. Most commonly, we recommend building a cloud function connector when there isn’t a native Fivetran connector already available to sync data from a given source. While you can always submit a Lite Connector By Request form for us to create a connector that isn’t yet supported, the feasibility and timeline of these connectors cannot always be guaranteed. With cloud functions, the timeline is your own, and on top of that, your options for customization are seemingly endless.
Not only can cloud functions help you develop integrations for data sources that Fivetran does not yet natively support, they also allow for a high level of customization with ANY data source that can be accessed via code.
The primary topic of this article is a lesser-known method of using a Cloud Function connector for source-side filtering. It can be as simple as it sounds and involves extracting a filtered subset of data from the data source at hand. This strategy* can be used during the development stage for quick and highly customized testing. The idea of source-side filtering is best understood through an example walkthrough.
*This is similar to the custom_payload in AWS
The first step in developing a cloud function is understanding what data you would like to integrate, and ensuring that the data can be accessed/extracted programmatically. If the data source has API functionality such as the example we will review, it is important to first understand which endpoints you want to retrieve data from. From here, it will next be important to understand what authentication or credentials are needed to access the data programmatically. Is it a username and password? Is it an API key and secret? In our example, the API is public and thus does not require authentication.
Using this example code we are going to hit the tube/status endpoint programmatically to ensure we can connect to the data source. Also, following best practices, we included error checking throughout.
import logging
import json
import requests
def main():
result = requests.get(
'https://api.tfl.gov.uk/line/mode/tube/status',
headers={
"content-type":"application/json",
"charset":"utf-8"
}
)
try:
since_id = None
timeline = json.loads(result.text)
except json.JSONDecodeError as e:
logging.error(e)
print("Error decoding JSON response")
except requests.RequestException as e:
logging.error(e)
print("Error accessing the API")
else:
print("Success! The response is:")
print(json.dumps(timeline, indent=4))
if __name__ == "__main__":
main()
We have successfully connected to the source and retrieved an initial set of results. A few things to keep in mind:
Using the code below, we can build a table to hold our response data. The table is represented as a list of dictionaries composed of the JSON response data. For this example, we are using a Pandas DataFrame to validate our data structure.
import logging
import json
import requests
def main():
result = requests.get(
'https://api.tfl.gov.uk/line/mode/tube/status',
headers={
"content-type":"application/json",
"charset":"utf-8"
}
)
try:
since_id = None
timeline = json.loads(result.text)
except json.JSONDecodeError as e:
logging.error(e)
print("Error decoding JSON response")
except requests.RequestException as e:
logging.error(e)
print("Error accessing the API")
else:
#print("Success! The response is:")
#print(json.dumps(timeline, indent=4))
tflLineStatus = []
for t in timeline:
# Remember the first id we encounter, which is the most recent
if (since_id == None) :
since_id = t["modified"]
# Add all tflLineStatus
tflLineStatus.append({
"linename": t["id"],
"linestatus": t["lineStatuses"][0]["statusSeverityDescription"],
"timestamp" : t["created"]
#,add data elements to the cloud function > test.
,"mode" : t["modeName"]
,"ls": t["lineStatuses"],
# , example of data element NOT associated to all records. DEMO status details error.
#"delay_reason" : t["lineStatuses"][0]["reason"]
})
print(json.dumps(tflLineStatus, indent=4))
print("state: "+ since_id)
# view your list of dictionaries(records) in table format w a DataFrame
import pandas as pd
df = pd.DataFrame(tflLineStatus)
print(df.head(5))
if __name__ == "__main__":
main()
The DataFrame allows you to check what the data will look like in table format. Utilizing the Python pandas package and DataFrames, with just 3 lines of code, you can convert your list of dictionaries into a pandas DataFrame and output the first N (in the code above, N=5) rows of your destination table. The following DataFrame output in your terminal will look like this:
From here, you might decide to:
As we work through our data validation checks, to simulate an error, add in {"delay_reason" :t["lineStatuses"][0]["reason"]} to make the function fail. This is due to ‘reason’ not being present in all records and also not accessing it correctly. To troubleshoot this, we will parse the original response data from step 1 using the example code below.
Using the code block below, we can see that we added logic to:
With these changes, we now have a table that will help us review line status, timing, and delay reasons.
import logging
import json
import requests
import fnmatch
def main():
result = requests.get(
'https://api.tfl.gov.uk/line/mode/tube/status',
headers={
"content-type":"application/json",
"charset":"utf-8"
}
)
try:
since_id = None
timeline = json.loads(result.text)
except json.JSONDecodeError as e:
logging.error(e)
print("Error decoding JSON response")
except requests.RequestException as e:
logging.error(e)
print("Error accessing the API")
else:
#print("Success! The response is:")
#print(json.dumps(timeline, indent=4))
tflLineStatus = []
for t in timeline:
# Remember the first id we encounter, which is the most recent
if (since_id == None) :
since_id = t["modified"]
# Define a wildcard pattern
wildcard_pattern = "*Delay*"
if fnmatch.fnmatch(t["lineStatuses"][0]["statusSeverityDescription"], wildcard_pattern):
print(t["lineStatuses"][0]["statusSeverityDescription"] + ": On " + t["lineStatuses"][0]["reason"])
s = t["lineStatuses"][0]["statusSeverityDescription"] + ": On " + t["lineStatuses"][0]["reason"]
else:
print(t["lineStatuses"][0]["statusSeverityDescription"])
s = "On Time"
# Add all tflLineStatus
tflLineStatus.append({
"linename": t["id"],
"linestatus": t["lineStatuses"][0]["statusSeverityDescription"],
"timestamp" : t["created"]
,"mode" : t["modeName"]
#,"ls": t["lineStatuses"]
,"delay_reason": s
# , example of data element NOT associated to all records.
# "delay_reason" : t["lineStatuses"][0]["reason"]
})
print(json.dumps(tflLineStatus, indent=4))
#print("state: "+ since_id)
#view your list of dictionaries(records) in table format w a DataFrame
import pandas as pd
df = pd.DataFrame(tflLineStatus)
print(df.head(10))
if __name__ == "__main__":
main()
-Output Example when testing the response data for ["lineStatuses"][0]["statusSeverityDescription"]
-Output example for the table layout, after including the logic for and parameter value of ‘s’.
With our pipe almost complete, it is time to test a few datasets and methodologies for populating the data. For this example:
The schema for a Cloud Function connector is defined in the JSON response that Fivetran expects back from the Cloud Function. Within the schema, we can define the tables and primary keys of those tables. Typically, the schema is hard coded within the Cloud Function. However, during development, the exact schema we want to create may not be explicitly known.
Instead of hard-coding the schema within the cloud function itself, we can make it dynamic by defining it within the secrets object of the cloud function connector in the Fivetran connector setup form. Doing this, allows us to:
Note this will result in append-only behavior from the connector.
As we review London Tube API data, we notice patterns in the response data that inspire a new table[delay_timing]. To create the new table we are going to add filtering logic to the function and a parameter to our secrets object. In this new table, we are going to populate delayed status records and the associated timestamp.
*Pro Tip: Parameter ‘wildcard_pattern’ is another example of functionality that could be driven by the Secrets object. Try it out!
To test the changes we will add logic to filter our response set to the first 5 responses. We will use another parameter in our secrets object[response_num]
{
"schema":{
"tfLineStatus":[
"linename",
"timestamp"
],
"delay_timing":[
"linename",
"timestamp",
"delay_reason"
]
},
"response_num":5
}
This will result in a code base like the one below.
import logging
import json
import requests
import fnmatch
def main():
result = requests.get(
'https://api.tfl.gov.uk/line/mode/tube/status',
headers={
"content-type":"application/json",
"charset":"utf-8"
}
)
try:
since_id = None
timeline = json.loads(result.text)
except json.JSONDecodeError as e:
logging.error(e)
print("Error decoding JSON response")
except requests.RequestException as e:
logging.error(e)
print("Error accessing the API")
else:
tflLineStatus = []
delay_timing = []
response_num = request["secrets"]["response_num"]
print("Checking for the first " + str(response_num) + " responses.")
for i, t in enumerate(timeline):
if i >= response_num:
break
# Remember the first id we encounter, which is the most recent
if (since_id == None) :
since_id = t["modified"]
# Define a wildcard pattern
wildcard_pattern = "*Delay*"
if fnmatch.fnmatch(t["lineStatuses"][0]["statusSeverityDescription"], wildcard_pattern):
s = t["lineStatuses"][0]["statusSeverityDescription"] + ": On " + t["lineStatuses"][0]["reason"]
# Add all delayed data
delay_timing.append({
"linename": t["id"],
"linestatus": t["lineStatuses"][0]["statusSeverityDescription"],
"timestamp" : t["created"]
,"mode" : t["modeName"]
,"delay_reason": s
})
else:
s = "On Time"
# Add all tflLineStatus
tflLineStatus.append({
"linename": t["id"],
"linestatus": t["lineStatuses"][0]["statusSeverityDescription"],
"timestamp" : t["created"]
,"mode" : t["modeName"]
,"delay_reason": s
})
#print(json.dumps(tflLineStatus, indent=4))
#print(json.dumps(delay_timing, indent=4))
#print("state: "+ since_id)
#view your list of dictionaries(records) in table format w a DataFrame
import pandas as pd
df = pd.DataFrame(tflLineStatus)
dfd = pd.DataFrame(delay_timing)
print("Review Line Status Table")
print(df.head(10))
print("Review Delayed Line Table")
print(dfd.head(10))
# Send JSON response back to Fivetran
if(since_id==None):
since_id = t["modified"]
ans = {
# // Remember the most recent id, so our requests are incremental
"state": {
"since_id": since_id
},
"schema" : {request["secrets"]["schema"]},
"insert": {
"tflLineStatus": tflLineStatus,
"delay_timing": delay_timing
},
"hasMore" : False
}
print("Fivetran Response Format")
print(ans)
if __name__ == "__main__":
main()
Looking at our example with the London Tube API, let’s say we want the primary key of our table to be dynamic. Originally, we only wanted the most recent record for each line, so we defined the primary key of the table to be the linename column. Using the secrets object:
{“schema”: {“tfLineStatus”: [“linename”]}}
Later on, we decided we require a historical view of the data, so we will add in timestamp as a part of the primary key. Thus, we can define the secrets object:
{“schema”: {“tfLineStatus”: [“linename”, “timestamp”]}}
We can bring this object into our cloud function code with the following code:
schema = request["secrets"]["schema"]
Overall, this creates a secrets object like:
{
"schema":{
"tfLineStatus":[
"linename",
"timestamp"
],
"delay_timing":[
"linename",
"timestamp",
"delay_reason"
]
},
"response_num":5
}
With the tips outlined in this article, start leveraging Cloud Functions today and streamline the data pipelines of tomorrow! Next time we will discuss Cloud Function architecture, pagination and cursoring.
We have successfully demonstrated how to leverage Fivetran cloud functions to achieve source-side filtering. The function will filter responses from an API to create two tables{tflLineStatus, delay_timing} and format the output using the Fivetran response format. This process can be utilized to kick-start data projects that are tailored to Data Science, ad hoc data pulls, and non-native sources. Fivetran offers a robust offering of 350+ prebuilt connectors, however, Data Science and the solutions of tomorrow might require non-native sources today. Rest assured that Fivetran has the solution, all you need to do is orchestrate it.