Pyspark safely on Data Lake Store and Azure Storage Blob

Hi , I’m working on several projects where is required to access cloud storages (in this case Azure Data Lake Store and Azure Blob Storage) from pyspark running on Jupyter avoiding that all the Jupyter users are accessing these storages with the same credentials stored inside the core-site.xml configuration file of the Spark Cluster.

microsoft-azure-blob-e1483079067730

134d5808-d70b-4db2-97f8-1061211cd82f

I started my investigations looking at the SparkSession that comes with Spark 2.0, especially to commands like this spark.conf.set(“spark.sql.shuffle.partitions”, 6), but I discovered that this command are not working at Hadoop settings level, but they are limited to the spark runtime parameters.

I moved then my attention to SparkContext and in particular to HadoopConfiguration that seemed promising but it is missing into the pyspark implementation…

Finally I was able to find this excellent Stackoverflow post that points out how to leverage the HadoopConfiguration functionality from pyspark.

So in a nutshell you can have the core-site.xml defined as follows:

xml

So as you can see we do not store any credential here.

Let’s see how to access Azure Storage Blob Container with a shared access signature that can be created specifically to access a specific Container (imagine it like a folder) and set almost a fine grained security model on the  Azure Storage account without sharing the Azure Blob Storage Access Keys.

If you love python here some code that an admin can use to generate SAS signatures quickly that last for 24 hours:

from azure.storage.blob import (
BlockBlobService,
ContainerPermissions
)

from datetime import datetime, timedelta

account_name ="ACCOUNT_NAME"
account_key ="ADMIN_KEY"
CONTAINER_NAME="CONTAINER_NAME"

block_blob_service = BlockBlobService(account_name=account_name, account_key=account_key)

sas_url = block_blob_service.generate_container_shared_access_signature(CONTAINER_NAME,ContainerPermissions.READ,datetime.utcnow() + timedelta(hours=24),)

print(sas_url)

You will obtain something like this:

sv=2015-04-05&st=2015-04-29T22%3A18%3A26Z&se=2015-04-30T02%3A23%3A26Z&sr=b&sp=rw&sip=168.1.5.60-168.1.5.70&spr=https&sig=Z%2FRHIX5Xcg0Mq2rqI3OlWTjEg2tYkboXr1P9ZUXDtkk%3D

You can refer to this link to understand the structure.

Ok now, once the azure storage admin provide us the signature, we can use this SAS signature to access directly the files on the Azure Storage Blob Container safely:

 
 sc._jsc.hadoopConfiguration().set("fs.azure.sas.PUT_YOUR_CONTAINER_NAME.PUT_YOUR_ACCOUNT_NAME.blob.core.windows.net", "PUT_YOUR_SIGNATURE")
 from pyspark.sql.types import *

# Load the data.We use the sample HVAC.csv file of HDInsight samples
 hvacText = sc.textFile("wasbs://PUT_YOUR_CONTAINER_NAME@PUT_YOUR_ACCOUNT_NAME.blob.core.windows.net/HVAC.csv")

# Create the schema
 hvacSchema = StructType([StructField("date", StringType(), False),StructField("time", StringType(), False),StructField("targettemp", IntegerType(), False),StructField("actualtemp", IntegerType(), False),StructField("buildingID", StringType(), False)])

# Parse the data in hvacText
 hvac = hvacText.map(lambda s: s.split(",")).filter(lambda s: s[0] != "Date").map(lambda s:(str(s[0]), str(s[1]), int(s[2]), int(s[3]), str(s[6]) ))

# Create a data frame
 hvacdf = sqlContext.createDataFrame(hvac,hvacSchema)

# Register the data fram as a table to run queries against
 hvacdf.registerTempTable("hvac")
 from pyspark.sql import HiveContext
 hive_context = HiveContext(sc)
 bank = hive_context.table("hvac")
 bank.show()

The same idea can be applied to data lake store. Assuming that you have your data lake credentials setup as described here , you can access data lake store safely in this way:

 
sc._jsc.hadoopConfiguration().set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/PUT_YOUR_TENANT_ID/oauth2/token")
sc._jsc.hadoopConfiguration().set("dfs.adls.oauth2.client.id", "PUT_YOUR_CLIENT_ID")
sc._jsc.hadoopConfiguration().set("dfs.adls.oauth2.credential", "PUT_YOUR_SECRET")


  from pyspark.sql.types import *

# Load the data. The path below assumes Data Lake Store is default storage for the Spark cluster
  hvacText = sc.textFile("adl://YOURDATALAKEACCOUNT.azuredatalakestore.net/Samples/Data/HVAC.csv")

# Create the schema
  hvacSchema = StructType([StructField("date", StringType(), False),StructField("time", StringType(), False),StructField("targettemp", IntegerType(), False),StructField("actualtemp", IntegerType(), False),StructField("buildingID", StringType(), False)])

  # Parse the data in hvacText
  hvac = hvacText.map(lambda s: s.split(",")).filter(lambda s: s[0] != "Date").map(lambda s:(str(s[0]), str(s[1]), int(s[2]), int(s[3]), str(s[6]) ))

  # Create a data frame
  hvacdf = sqlContext.createDataFrame(hvac,hvacSchema)

  # Register the data fram as a table to run queries against
  hvacdf.registerTempTable("hvac")

    from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
bank = hive_context.table("hvac")
bank.show()

Happy coding with pyspark and Azure!

Annunci

Rispondi

Inserisci i tuoi dati qui sotto o clicca su un'icona per effettuare l'accesso:

Logo WordPress.com

Stai commentando usando il tuo account WordPress.com. Chiudi sessione / Modifica )

Foto Twitter

Stai commentando usando il tuo account Twitter. Chiudi sessione / Modifica )

Foto di Facebook

Stai commentando usando il tuo account Facebook. Chiudi sessione / Modifica )

Google+ photo

Stai commentando usando il tuo account Google+. Chiudi sessione / Modifica )

Connessione a %s...