Azure ETL Pipeline with Python

The Problem

We want to create an application that pulls data from different sources and surfaces that data to the front end. The following visualizes what that flow would look like.

Data Pipline > PGSQL > API > Front End

The piece we're focusing on is the Data Pipeline and the diagram below illustrates the solution to collecting the data and moving it to a database.

Data Factory = (Batch Service extract > Storage Container > Batch Service Write > PGSQL

Pre-requisites

Pipeline Flow

  1. Extract and transform the data
  2. Write the data to csv files and upload to Azure storage container
  3. Read the uploaded csv's and write to PGSQL

This pipeline will run two types of scripts.

  • ETL: Extract, transform and load data as csv into a storage container
  • Write: Read and write the csv files to the database.

The pipeline can use multiple ETL scripts but only one Write script.

To start locate your designated storage account and create a container named scripts. This is the container where you will store all your scripts.

Installing Packages

Next we'll setup the Nodes to run the scripts. We can connect to the nodes using SSH and install the python packages using terminal. Note: The node's operating system: canonical ubuntuserver 16.04-lts

  1. Locate the Batch Account
  2. In the overview click Connect and Generate a Random User
  3. Open terminal run the generated SSH command.
    ssh SampleUser@11.23.456.789 -p 50000
  4. Enter the generated password
Python 3.6

The following installs Python 3.6 and required packages

sudo -H add-apt-repository ppa:deadsnakes/ppa
sudo -H apt-get update
yes | sudo apt-get install python3.6
### Update pip  
sudo -H python3.6 -m pip install --upgrade pip
### Install packages
  sudo -H python3.6 -m pip install pandas SQLAlchemy azure-storage-blob

Upload and Download Files

Now that the Batch Service Nodes are setup we need our scripts to be able to read from and write to Azure. Azure has a python package which allows you to manage blobs within your storage account. Blobs are objects which can hold text, images, and streaming media, in this case csv's. Add the following to your ETL and Write scripts respectively.

  1. Locate your storage account.
  2. In Settings select Access Keys and copy the Connection string.

    importpandas as pd
    fromio import StringIO
    fromazure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
    # Create the blob service to connect to Azure
    connection_string = '{connection_string}'
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)
  3. Specify the container you want to access.
    container_name = '{container_name}'
    container_client = blob_service_client.get_container_client(container_name)

    Downloading
    filename = '{filename}.csv'
    # Downloads csv file into a dataframe
    downloaded_blob = container.download_blob(filename)
    df = pd.read_csv(StringIO(downloaded_blob.content_as_text()))

    Uploading
    df = pd.DataFrame()
    filename = '{filename}.csv'
    df.to_csv('./{file}'.format(file=filename))
    blob_client = container_client.get_blob_client(filename)
    with open(filename, "rb") as data:
     blob_client.upload_blob(data, blob_type="BlockBlob")

Writing to PGSQL

The last step in the pipeline is to read all stored files and write to the database. Add the following to be able to write to your database.

  1. Configure credentials and create engine

    from sqlalchemy import create_engine, update
    from sqlalchemy.dialects.postgresql import insert
    sqlserver = 'myserver'
    database = 'db'
    username = 'admin'
    password = 'password'
    driver = 'org.postgresql.Driver'
    connection_string = ('postgresql+psycopg2://{username}:{pwd}@{server}:5432/{db}'
                   .format(username=username, pwd=password, server=sqlserver, db=database))
    # Create engine to connect to Database
    engine = create_engine(connection_string, connect_args={'sslmode':'require'})

  2. Writing DataFrame to Database

    df = pd.DataFrame()
    #replace overwrites the table
    df.to_sql(
       '{table_name}',
       engine,
       if_exists='replace',
       index=False,
    )

  3. Be sure the PGSQL database has Allow Azure services enabled, can be found under Connection Security. This will provide access for the batch service to connect to the database.

Automating using Data Factory

The pipeline is essentially complete and now it's time to automate it.

  1. In the Data Factory UI create a new pipeline +
  2. On the left menu drag and drop the Batch Service onto the UI and link your Batch Account
  3. Under the Settings tab link the Storage Account and the container that holds the script you want to run
  4. In the Command text box you can execute the script with:

    #Depending on the python version
    python3.6 MyScript.py

  5. Repeat steps 2 - 4 for each script you want to run
  6. You can arrange the order you want to run each script
  7. Be sure to add the Write script to the end
  8. To schedule the pipeline to run, in the Data Factory UI select Trigger at the top, then New/Edit
  9. You can click Debug to test your scripts
  10. In the next section we can monitor the logs for each script by using the respective Run ID
ETL Script 1 > ETL Script 2 > Write a PGSQL

Monitoring Tasks

To monitor our scripts performance you can use Azure Batch Explorer. Batch Explorer allows you to manage your batch accounts, understand task runtime and find issues within your scripts. You can download it here.

  1. Open Batch Explorer
  2. On the left tab select Jobs and click the data factory job
  3. Copying the Run ID from previous section, paste it into the Tasks text box
  4. You can now view the logs in stdout.txt and stderr.txt