Blog: Integrating machine learning into ETL processing for data cleansing

We have all heard the phrase, garbage in, garbage out (GIGO). For a data brokerage company trying to optimize the supply chain between durable goods manufacturers, parts suppliers, and repair shops, GIGO could spell disaster. In this post, we’ll show how we helped one of our customers integrate machine learning with neural networks into their Extract, Transform, and Load (ETL) process to fill in the gaps for data cleansing.

Using Google Search for gathering missing data doesn’t scale

Our customer owned an extensive amount of data. The main challenge was to aggregate data from different sources. Due to human input factors, the reports were inconsistent and contained blank entries. Missing details such as durable good models, year of manufacture, type, description of the repair, or the part’s manufacturer, could cast a shadow over any comprehensive forecast.

They were using a brute force approach for completing blank entries. This involved looking up parts information using Google Search and entering in data retrieved from the search result. Needless to say, this is not a scalable solution.

Processing missing data with neural networks

The good news, our customer had plenty of data we could use to train the neural network models. The following diagram shows our initial solution for providing predicted values for missing data.

Our data science team for the project continued using the Google Search process to help understand the problem domain, look at data trends, and investigate potential models for filling in the missing data. As part of the proof of concept (POC) phase, we used the Azure Machine LearningAzure Data Factory, Azure SQL Database, and Microsoft Power BI to determine trends in identifying dirty data.

Using the power of Apache Spark and Databricks

Our team faced a challenge in having to process the huge scope of data. Not only were the data volumes too big for any machine’s RAM to handle, processing that amount of data in the cloud would be expensive and complicated. To overcome that issue, we leveraged the distributed computing technology Apache Spark.

The process of neural networks implementation had seven stages:

  1. At the preparatory stage, our developers collected the data stored in different tables into a single database.
  2. Then, to set up an environment for processing big data, we utilized Azure Databricks to arrange a cluster of many servers and transferred the data to the Databricks’ cluster.
  3. To utilize the multiple servers as one computing system, we leveraged Apache Spark on Azure Databricks to preprocess the data.
  4. Next, we transferred the preprocessed data to the GPU cluster.
  5. At the next stage, the development team trained the ML model built on the PyTorch framework to predict the blank entries in the database.
  6. After the model training was over, the team ran the prediction machine learning algorithm.
  7. Finally, our specialists transferred the trained model to the customer’s database, and voilà.

The neural network-based solution addressed the problem with data quality. Our approach allowed for predicting blank entries in the input data with high accuracy. Thus, our customer gained the opportunity to evaluate his business, build reliable analytical reports, and continue refining the machine learning ETL solution.

Training the neural networks

After we created the neural networks and prepared data, we set about training the neural networks. Here, we applied the Tensor classes with the CUDA tensors for asynchronous execution using the PyTorch framework.

import torch

def fit(model, train_dl, val_dl, criterion, opt, scheduler, epochs=3):

    model = model.train()               # Use training mode for model
    for epoch in range(epochs):         # Look through entire dataset 3 times

        running_loss = 0.0

        for cat, cont, y in train_dl:
            cat = cat.cuda()           # Load categories into GPU
            cont = cont.cuda()         # Load continuous cross feature column into GPU

            y = torch.tensor(y, dtype=torch.long, device=device) # #Builds tensor with data
            y = y.reshape(1, -1)       # Flatten the data to a single dimension array
            y = y.squeeze()            # Eliminate missing values

            opt.zero_grad()            # Set the model parameters to zero
            pred = model(cat, cont)    # Run the model to get predicted category

            loss = criterion(pred, y)  # Create a criterion for the loss
            loss.backward()            # Computes the matching level based on current data
            lr[epoch].append(opt.param_groups[0]['lr']) #Appends parameter to end of list
            tloss[epoch].append(loss.item())  # Appends missing item to end of other list
            scheduler.step()           # Tell scheduler to update parameters
            opt.step()                 # Tell optimizer to update parameters

        running_loss += loss.item()    # Add to the running loss for the current epoch

        vloss = []                     # Create empty array for potential list items
        if val_dl:                     # If the validated category data set exists

            correct = 0
            total = 0
            for cat, cont, y in val_dl: # Loop through each record
                cat = cat.cuda()       # Load category into GPU
                cont = cont.cuda()     # Load continuous column value into GPU

                y = torch.tensor(y, dtype=torch.long, device=device) #Build Tensor
                y = y.reshape(1, -1)   # Flatten the data to a single dimension array
                y = y.squeeze()        # Eliminate missing values

                outputs = model(cat, cont) # Run model with actual data

                _, predicted = torch.max(outputs.data, 1) # Get the max value from the run
                total += y.size(0)     # Update total with missing value score
                correct += (predicted == y).sum().item() # Sum up the corrected values

            val_accuracy = 100 * correct / total # Compute the value accuracy
            print(
                f'Epoch {epoch}: train_loss: {running_loss} | val_accuracy: {val_accuracy}')

    return lr, tloss, vloss

Running the prediction mechanism

The prediction function utilizes the same approach. We select the data we want to predict, run them through the Softmax prediction function and receive the prediction as a result.

import torch

def partscat(m, dataset):
pred_list = []                  # Create empty prediction array
prob_list = []                  # Create the probability of match array
m = m.eval()                    # Sets model m to use evaluation mode

for cat, cont, y in dataset:    # Uses data set from SQL with missing values

    cat = cat.cuda()            # Load categories into GPU
    cont = cont.cuda()          # Load continuous cross feature column into GPU

    outputs = m(cat, cont)      # Run the model

    sm = nn.Softmax(dim=1)      # Fit data in range [0,1] and a sum of 1
    sm_output = sm(outputs.data) # Apply the softmax to the data set

    probability, predicted = torch.max(sm_output, 1) # Get the max value and assign them

    pred_result = predicted.tolist()   # Convert predicted category to list
    prob_result = probability.tolist() # Convert propability value to list

    pred_list.extend(pred_result)      # Add the values to list
    prob_list.extend(prob_result)

    return pred_list, prob_list; # Return the list of predicted values

Learn more on how machine learning can improve your data quality

If you’re curious about how data science can prove useful for your business, contact us. Our specialists are always ready to analyze your system and suggest a profitable data science solution for implementation.

Share this...
Share on Facebook
Facebook
Tweet about this on Twitter
Twitter
Share on LinkedIn
Linkedin