November 27, 2019

Enhancing Digital Twins Part 4: Writing Databricks' Predictive Maintenance Results to Blob

By

Theta

In this post, we will detail how we committed our results from part 3 to Azure Blob Storage so it's accessible to our Digital Twins project.

In part 4 of 4, we detail how we committed our Predictive Maintenance results to Azure Blob Storage so it's accessible by our Digital Twin project. In part 1 of this series, we introduced the concept of Predictive Maintenance in Digital Twins. In part 2, we set up the Databricks environment to prepare for Predictive Maintenance analyses. In part 3, we detailed the R analyses we conducted on our Predictive Maintenance dataset from post 2 to help our Digital Twins project make better informed suggestions.

1. Forming JSON output from R analyses (R)

There are myriad ways to construct json in R. We chose to use CRAN’s ‘sprintf’ function, which returns a vector according to our predefined output format.  

Build up the json structure starting from the most nested objects.

 
#First nested object 

overview_output <- sprintf('{"totalAssets":"%s",  

                          "totalBroken": "%s",  

                          "totalInService":"%s", 

                          "totalUrgent":"%s", 

                          "totalMedium":"%s", 

                          "totalGood":"%s" 

                          }', totalBranchA, total_Broken_branchA, total_Working_branchA, urgent, medium, good)
 

Then gradually build the final object level by level.

 

#Parent object 
#Remember the parent object needn't be in quotations 

json_output <- sprintf('{"overview":%s 

                          }', overview_output) 

 


We suggest intermittently printing out the resulting json object to check it against a JSON validator to ensure the JSON is well formed.

 

#Final JSON output  

{ 

  "overview": { 

    "totalAssets": "336", 

    "totalBroken": "123", 

    "totalInService": "213", 

    "totalUrgent": "0", 

    "totalMedium": "7", 

    "totalGood": "206" 

  } 

} 

 

2. Writing JSON output to Databricks Clusters’ driver node (R)

Since we chose to write to Azure Blob Storage using Python, we need to store the resulting JSON output in a location accessible throughout different cells and manipulatable regardless of language. To do this, we enlisted the help of the ‘write’ function from CRAN’s jsonlite package which can save the JSON file to a directory of your choice via its path.

We favoured saving to the Databricks default location. You can use this Python script to find out where your Databricks saves on default:

 

%python

#Find the path of where your files are generally saved 
import json 
import os  

data = {"hello": "test"}  

with open('filenameWithExtension', 'w') as outfile: 

    json.dump(data, outfile) 
 

print(os.path.dirname(os.path.realpath('filenameWithExtension'))) #"/databricks/driver/filenameWithExtension"
 

We found that Databricks saves to ‘/databricks/driver/’ on default, the driver node of our cluster. The driver node oversees the commandment of its worker nodes to fetch and pull data from various sources. You should ensure your driver memory is capable of handling your read/write operations.

You could also consider reading from and writing to Databricks' File System.

Using jsonlite’s ‘write’, we wrote the JSON output to the location Databricks naturally saves to.

 

#jsonlite's write function 
#JSON written to where Databricks saves on default. 

write(json_output, "/databricks/driver/fileName.json") 
 

3. Writing JSON output to Azure Blob Storage (Python)

We will now begin writing to our notebook in Python, so in each Python cell, remember to prefix it with ‘%python’ if it isn’t your notebook’s main language.  

Before writing to a Blob Container, you have to first create an Azure Storage Account Container. We recommend doing it through the Azure Portal as it’s quick and easy. Keep note of its storage account name, container name and access key as you’ll need them later.

We can access our Blob container using Spark and Databrick’s API, which is available to Python.

Form a connection to your Azure Blob Container as below, replacing the placeholders with your own credentials.  

 

%python 

import pandas as pd 
from pyspark.sql import Row 
from pyspark.sql import DataFrameWriter  

storage_name = "yourBlobStorageName" 

storage_access_key = "yourBlobAccessKey" 

#Set account access key: 

spark.conf.set( 

  "fs.azure.account.key.{}.blob.core.windows.net".format(storage_name), storage_access_key) 

 

Databricks is only capable of saving to folders, so formulate your Blob Storage path so it points to a folder instead of the root of the Storage Account Container. It should look something like: ‘wasbs://[email protected]/yourStorageFolderName’

 

%python 

storage_name = "yourBlobStorageName" 

output_container_name = "yourContainerName" 

blob_folder = "yourStorageFolderName" 

#Construct storage path 

output_container_path = "wasbs://{}@{}.blob.core.windows.net".format(output_container_name, storage_name) 

output_blob_folder = "{}/{}".format(output_container_path, blobFolder) 
 

We chose to harness Spark’s DataFrameWriter API to write our JSON output to a blob. The DataFrameWriter API allows us to write DataFrames to external storage systems using its ‘Save’ method.

Before this step, we will have to convert our JSON file into a DataFrame. We fetch and load the JSON output by its path before transforming it into a Pandas DataFrame as Pandas doesn’t require us to specify a schema for a list of objects.

As it is now a Pandas DataFrame, we can then easily convert it to a Spark DataFrame to prepare it for Spark’s ‘Save’ method.

 

%python 

import pandas as pd 
from pyspark.sql import DataFrameWriter 
import json 
from os import listdir 
from os.path import isfile, join 
import os 

json_output = "yourJsonOutputFileNameWithExtension" 

#Construct a dataframe from the output JSON 

data = None 
dir_path = os.path.dirname(os.path.realpath(json_output))  

with open(dir_path+'/'+json_output) as json_file: 

    data = json.load(json_file)

# Make the object a list 

df = pd.DataFrame([data]) 

ddf = spark.createDataFrame(df) 

 

The mode parameter specifies the behavior of the save operation in case the data already exists. The ‘overwrite’ mode best suited our needs, but you can also ‘append’ the data, ‘ignore’ the operation or throw an ‘error’.

The path parameter must be a path to a Hadoop-supported file system, a condition which Azure Blob Storage satisfies. Once this is executed, the blob will be written to with your JSON output.

4. Cleaning up Azure Blob Storage after write (Python)

A strange effect of writing to Blob through Databricks is that it commits multiple unnecessary files alongside your JSON output. This happens on every write and will start to exponentially crowd your Blob Storage. These unnecessary files come in multiple forms and recognising them helped us to eliminate them. The following shows the results you can expect from a single write to Blob Storage - 5 files are committed even though only the last one is necessary.    

 

# The only necessary file is the last one 

[ 

  FileInfo(path='wasbs://[email protected]/yourStorageFolderName/_SUCCESS', 

  name='_SUCCESS', 

  size=0), 

  FileInfo(path='wasbs://[email protected]/yourStorageFolderName/_committed_5112101773895422549', 

  name='_committed_5112101773895422549', 

  size=216), 

  FileInfo(path='wasbs://[email protected]/yourStorageFolderName/_started_5112101773895422549', 

  name='_started_5112101773895422549', 

  size=0), 

  FileInfo(path='wasbs://[email protected]/yourStorageFolderName/part-00000-tid-5112101773895422549-6cc5c30a-c6e3-4688-a8b4-c477bb57379e-14-1-c000.json', 

  name='part-00000-tid-5112101773895422549-6cc5c30a-c6e3-4688-a8b4-c477bb57379e-14-1-c000.json', 

  size=0), 

  FileInfo(path='wasbs://[email protected]/yourStorageFolderName/part-00003-tid-5112101773895422549-6cc5c30a-c6e3-4688-a8b4-c477bb57379e-17-1-c000.json', 

  name='part-00003-tid-5112101773895422549-6cc5c30a-c6e3-4688-a8b4-c477bb57379e-17-1-c000.json', 

  size=394) 

] 
 

Note the Blob name, formatted like ‘part-xxxxx-tid…’, is autogenerated on commit. We will change the Blob name later, alongside the Blob Storage cleanup. The unnecessary blobs fall into one of 2 categories, they are either of size 0 or contain ‘committed’ in their name. We will remove the unnecessary blobs according to these conditions.

In order to retrieve the committed blobs, we made use of Databrick’s ‘dbutils’ Utility which has several helpful methods to help us interact with Azure Blob Storage. ‘dbutils’ mounts a folder within our Blob Container to Databrick’s File System. The mount acts as a pointer, giving us directions and accessibility to said folder. However, we will first change the Blob name of our JSON output, so it’s no longer prefixed with ‘part-xxxxx-tid…’, else we risk deleting it during cleanup.

Writing to Azure Blob Storage through Databricks is a pain point as Databricks is incapable of accepting a blob name for the JSON output on initial commit. We tried extending the path parameter in Spark’s ‘Save’ method to include a blob name with its extension and found it created a blob with the correct name, but with no content.  

 

# Note there is no content as size is = 0 

[ 

  FileInfo(path='wasbs://[email protected]/yourStorageFolderName/json_output.json/', name='json_output.json/', size=0)

]
 

A way to circumvent this effect is to reformat the name of the blob after its creation. This can be done using dbutils.fs.mv(toPath, fromPath) – Databricks Utility File System Move, which allows you to move a file or directory across file systems, parametrized by the to and from paths of a blob.  

First, we will list the entire Blob folder to retrieve the blob we created moments ago.

 

%python  

# dbutils.fs.ls – Databricks Utility File System List; this lists all items in a folder  

files = dbutils.fs.ls(output_blob_folder)
 

Then we will filter the list to extract only the files with names starting with ‘part-’ before getting the second item’s path from the list as the second item is the last item in the list.

This path will be the fromPath parameter in dbutils.fs.mv(toPath, fromPath). Construct your toPath as you see fit, we relocated the blob to the same folder, under a new blob name, ‘json_output.json’.

 

%python  

# dbutils.fs.mv(fromPath, toPath) - Databricks Utility File System Move;  
# supports renaming and relocation of files
# Filter out blob names not containing 'part-' in [dbutils.fs.ls(output_blob_folder)] 

output_file = [x for x in files if x.name.startswith("part-")] 

# The second item in output_file is your JSON output Blob
# The new filename is defined in the second param 

fileName = "newFileNameWithExtension" 

dbutils.fs.mv(output_file[1].path, "%s/%s" % (output_blob_folder, fileName)) 


 

After executing this, you will have renamed and/or relocated the last item in dbutils.fs.ls(folderPath).

 

[ 

  FileInfo(path='wasbs://[email protected]/yourStorageFolderName/_SUCCESS', 

  name='_SUCCESS', 

  size=0), 

  FileInfo(path='wasbs://[email protected]/yourStorageFolderName/_committed_5112101773895422549', 

  name='_committed_5112101773895422549', 

  size=216), 

  FileInfo(path='wasbs://[email protected]/yourStorageFolderName/_started_5112101773895422549', 

  name='_started_5112101773895422549', 

  size=0), 

  FileInfo(path='wasbs://[email protected]/yourStorageFolderName/part-00000-tid-5112101773895422549-6cc5c30a-c6e3-4688-a8b4-c477bb57379e-14-1-c000.json', 

  name='part-00000-tid-5112101773895422549-6cc5c30a-c6e3-4688-a8b4-c477bb57379e-14-1-c000.json', 

  size=0), 

  FileInfo(path='wasbs://[email protected]/yourStorageFolderName/json_output.json', 

  name='json_output.json', 

  size=394) 

] 
 

Now we can remove the redundant files from Blob Storage. Relist the items in the folder and filter the files according to the aforementioned conditions. Files are removed using dbutils.fs.rm(blobPath) – Databricks Utility File System Remove.

 

%python 

# Relist the items in the folder 
files = dbutils.fs.ls(output_blob_folder)  

# Remove files using dbutils.fs.rm(blobPath) – Databricks Utility File System Remove  
# Remove if blob size is <0 or if name includes 'path'or'committed' 

to_delete_file = [dbutils.fs.rm(x.path) for x in files if x.size == (0) or 

                x.name.startswith("part-") or "committed" in x.name] 
 

Finally, if you re-execute dbutils.fs.ls(folderPath), you should only see the renamed file in your Blob folder.

 

[  

  FileInfo(path='wasbs://[email protected]/yourStorageFolderName/json_output.json', 

  name='json_output.json', 

  size=394) 

] 
 

Please note that we are purposefully overwriting a single Blob, but this might not be appropriate for your purpose. In order to not lose a record of a previous analysis in Blob, you will have to make sure your toPath for dbutils.fs.mv(toPath, fromPath) is unique across the Blob Storage folder. Here are some suggested ways to create a unique Blob name:  

  • You can keep the name (‘part-xxxxx-tid…’) which Databricks automatically assigns Blobs
  • You can count the number of Blobs in the Blob folder with dbutils.fs.ls(folderPath) before any write operations and attach this count to the end of the current Blob’s name. For example, a blob folder with initially 0 items will be written to next with a Blob named ‘json_output0.json’
  • You could also name the Blob with the date time at the time of analysis by using Python’s datetime module. For example, a Blob might be named ‘2019-09-23 02:59:37.465105.json’

Now that we’ve committed our R analyses results from our Predictive Maintenance dataset to Azure Blob Storage, we can access the Blob from a greater number of applications.

Like we’d initially set out to do, we retrieved the JSON output Blob from Mixiply in order to visualise better asset maintenance suggestions and alerts in our warehouse’s digital twin.  


Warehouse’s Digital Twin asset monitoring board in Mixiply/AR
Warehouse’s Digital Twin asset monitoring board in Mixiply/AR

Overview of Warehouse’s monitoring boards in Mixiply/AR
Overview of Warehouse’s monitoring boards in Mixiply/AR

And that's it for this series on enhancing Digital Twins with Predictive Maintenance.

We've shown that Digital Twins, with the help of AI and machine learning, could be used to predict and diagnose asset failures before they even happen. This is just the start as Predictive Maintenance is only one of many possible additions to digital twin technology. We can't wait to see what else you come up with.