Pyspark safely on Data Lake Store and Azure Storage Blob

Hi , I’m working on several projects where is required to access cloud storages (in this case Azure Data Lake Store and Azure Blob Storage) from pyspark running on Jupyter avoiding that all the Jupyter users are accessing these storages with the same credentials stored inside the core-site.xml configuration file of the Spark Cluster.

microsoft-azure-blob-e1483079067730

134d5808-d70b-4db2-97f8-1061211cd82f

I started my investigations looking at the SparkSession that comes with Spark 2.0, especially to commands like this spark.conf.set(“spark.sql.shuffle.partitions”, 6), but I discovered that this command are not working at Hadoop settings level, but they are limited to the spark runtime parameters.

I moved then my attention to SparkContext and in particular to HadoopConfiguration that seemed promising but it is missing into the pyspark implementation…

Finally I was able to find this excellent Stackoverflow post that points out how to leverage the HadoopConfiguration functionality from pyspark.

So in a nutshell you can have the core-site.xml defined as follows:

xml

So as you can see we do not store any credential here.

Let’s see how to access Azure Storage Blob Container with a shared access signature that can be created specifically to access a specific Container (imagine it like a folder) and set almost a fine grained security model on the  Azure Storage account without sharing the Azure Blob Storage Access Keys.

If you love python here some code that an admin can use to generate SAS signatures quickly that last for 24 hours:

from azure.storage.blob import (
BlockBlobService,
ContainerPermissions
)

from datetime import datetime, timedelta

account_name ="ACCOUNT_NAME"
account_key ="ADMIN_KEY"
CONTAINER_NAME="CONTAINER_NAME"

block_blob_service = BlockBlobService(account_name=account_name, account_key=account_key)

sas_url = block_blob_service.generate_container_shared_access_signature(CONTAINER_NAME,ContainerPermissions.READ,datetime.utcnow() + timedelta(hours=24),)

print(sas_url)

You will obtain something like this:

sv=2015-04-05&st=2015-04-29T22%3A18%3A26Z&se=2015-04-30T02%3A23%3A26Z&sr=b&sp=rw&sip=168.1.5.60-168.1.5.70&spr=https&sig=Z%2FRHIX5Xcg0Mq2rqI3OlWTjEg2tYkboXr1P9ZUXDtkk%3D

You can refer to this link to understand the structure.

Ok now, once the azure storage admin provide us the signature, we can use this SAS signature to access directly the files on the Azure Storage Blob Container safely:

 
 sc._jsc.hadoopConfiguration().set("fs.azure.sas.PUT_YOUR_CONTAINER_NAME.PUT_YOUR_ACCOUNT_NAME.blob.core.windows.net", "PUT_YOUR_SIGNATURE")
 from pyspark.sql.types import *

# Load the data.We use the sample HVAC.csv file of HDInsight samples
 hvacText = sc.textFile("wasbs://PUT_YOUR_CONTAINER_NAME@PUT_YOUR_ACCOUNT_NAME.blob.core.windows.net/HVAC.csv")

# Create the schema
 hvacSchema = StructType([StructField("date", StringType(), False),StructField("time", StringType(), False),StructField("targettemp", IntegerType(), False),StructField("actualtemp", IntegerType(), False),StructField("buildingID", StringType(), False)])

# Parse the data in hvacText
 hvac = hvacText.map(lambda s: s.split(",")).filter(lambda s: s[0] != "Date").map(lambda s:(str(s[0]), str(s[1]), int(s[2]), int(s[3]), str(s[6]) ))

# Create a data frame
 hvacdf = sqlContext.createDataFrame(hvac,hvacSchema)

# Register the data fram as a table to run queries against
 hvacdf.registerTempTable("hvac")
 from pyspark.sql import HiveContext
 hive_context = HiveContext(sc)
 bank = hive_context.table("hvac")
 bank.show()

The same idea can be applied to data lake store. Assuming that you have your data lake credentials setup as described here , you can access data lake store safely in this way:

 
sc._jsc.hadoopConfiguration().set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/PUT_YOUR_TENANT_ID/oauth2/token")
sc._jsc.hadoopConfiguration().set("dfs.adls.oauth2.client.id", "PUT_YOUR_CLIENT_ID")
sc._jsc.hadoopConfiguration().set("dfs.adls.oauth2.credential", "PUT_YOUR_SECRET")


  from pyspark.sql.types import *

# Load the data. The path below assumes Data Lake Store is default storage for the Spark cluster
  hvacText = sc.textFile("adl://YOURDATALAKEACCOUNT.azuredatalakestore.net/Samples/Data/HVAC.csv")

# Create the schema
  hvacSchema = StructType([StructField("date", StringType(), False),StructField("time", StringType(), False),StructField("targettemp", IntegerType(), False),StructField("actualtemp", IntegerType(), False),StructField("buildingID", StringType(), False)])

  # Parse the data in hvacText
  hvac = hvacText.map(lambda s: s.split(",")).filter(lambda s: s[0] != "Date").map(lambda s:(str(s[0]), str(s[1]), int(s[2]), int(s[3]), str(s[6]) ))

  # Create a data frame
  hvacdf = sqlContext.createDataFrame(hvac,hvacSchema)

  # Register the data fram as a table to run queries against
  hvacdf.registerTempTable("hvac")

    from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
bank = hive_context.table("hvac")
bank.show()

Happy coding with pyspark and Azure!

Annunci

Data scientists wanna have fun!

Hi everyone, yes I’m back!

This is time we are going to setup a Big Data playground on Azure that can be really useful for any python/pyspark data scientist .

Typically what you can have out of the box on Azure for this task it’s Spark HDInsight cluster (i.e. Hadoop on Azure in Platform as a Service mode) connected to Azure Blob Storage (where the data is stored)  running pyspark jupyter notebooks.

It’s a fully managed cluster that you can start in few clicks and gives you all the Big Data power you need to crunch billions of rows of data, this means that cluster nodes configuration, libraries, networking, etc.. everything is done automatically for you and you have just to think to solve your business problems without worry about IT tasks like “check if cluster is alive or check if cluster is ok, etc…”  , Microsoft will do this for you.

Now one key ask that data scientist have is : “freedom!” , in other words they want to install/update new libraries , try new open source packages but at the same time they also don’t want to manage “a cluster” as an IT department .

In order to satisfy these two requirements we need some extra pieces in our playground and one key component is the Azure Linux Data Science Virtual Machine.

The Linux Data Science Virtual Machine it’s the Swiss knife for all data science needs, here  you can have an idea of all the incredible tasks you can accomplish with this product .

In this case I’m really interested in these capabilities:

  • It’s a VM so data scientists can add/update all the libraries they need
  • Jupyter and Spark are already installed on it so data scientists can use it to play locally and experiment on small data before going “Chuck Norris mode” on HDInsight

But there is something missing here…., as a data scientist I would love to work in one unified environment accessing all my data and switch with a simple click from local to “cluster” mode without changing anything in my code or my configurations.

Uhmmm…. seems impossible, here some magic is needed !

Wait a minute , did you say “magic”? I think we have that kind of magic :-), it’s spark magic!

In fact we can use the local jupyter and spark environment by default and when we need the power of the cluster using spark magic when can , simply changing the kernel of the notebook,  run the same code on the cluster!

diagramIn order to complete the setup we need to do the following:

  1. Add to the Linux DS VM the possibility to connect , via local spark, to azure blob storage (adding libraries, conf files and settings)
  2. Add to the Linux DS VM spark magic (adding libraries, conf files and settings) to connect from local Jupyter notebook to the HDInsight cluster using Livy

Here the detailed instructions:

Step 1  to start using Azure blob from your Spark program (ensure you run these commands as root):

cd $SPARK_HOME/conf
cp spark-defaults.conf.template spark-defaults.conf
cat >> spark-defaults.conf <<EOF
spark.jars                 /dsvm/tools/spark/current/jars/azure-storage-4.4.0.jar,/dsvm/tools/spark/current/jars/hadoop-azure-2.7.3.jar
EOF

If you dont have a core-site.xml in $SPARK_HOME/conf directory run the following:

cat >> core-site.xml <<EOF
< ?xml version=”1.0″ encoding=”UTF-8″?>
< ?xml-stylesheet type=”text/xsl” href=”configuration.xsl”?>
< configuration>
< property>
<name>fs.AbstractFileSystem.wasb.Impl</name>
<value>org.apache.hadoop.fs.azure.Wasb</value>
< /property>
< property>
<name>fs.azure.account.key.YOURSTORAGEACCOUNT.blob.core.windows.net</name>
<value>YOURSTORAGEACCOUNTKEY</value>
< /property>
< /configuration>
EOF

Else, just copy paste the two <property> sections above to your core-site.xml file. Replace the actual name of your Azure storage account and Storage account key.

Once you do these steps, you should be able to access the blob from your Spark program with the wasb://YourContainer@YOURSTORAGEACCOUNT.blob.core.windows.net/YourBlob URL in the read API.

Step 2 Enable local Juypiter notebook with remote spark execution on  HDInsight (Assuming that default python is 3.5 like is coming from Linux DS VM ):

sudo /anaconda/envs/py35/bin/pip install sparkmagic

cd /anaconda/envs/py35/lib/python3.5/site-packages

sudo /anaconda/envs/py35/bin/jupyter-kernelspec install sparkmagic/kernels/pyspark3kernel

sudo /anaconda/envs/py35/bin/jupyter-kernelspec install sparkmagic/kernels/sparkkernel

sudo /anaconda/envs/py35/bin/jupyter-kernelspec install sparkmagic/kernels/sparkrkernel

 

in your /home/{YourLinuxUsername}/ folder

  1. create a folder called .sparkmagic and create a file called config.json
  2. Write in the file the configuration values of HDInsight (livy endpoints and auth) as described here :

At this point going back to Jupyter should allow you run your notebook against the HDInsight cluster using PySpark3, Spark, SparkR kernels and you can switch from local Kernel to remote kernel execution with one click!

Of course some security features have to improved (passwords in clear text!), but the community is already working on this (see here support for base64 encoding) and ,of course , you can get the spark magic code from git, add the encryption support you need and bring back this to the community!

Have fun with Spark and Spark Magic!

UPDATE : here instructions on how to connect also to Azure Data Lake Store!

  1. Download this package and just extract these two libraries: azure-data-lake-store-sdk-2.0.11.jar , hadoop-azure-datalake-3.0.0-alpha2.jar
  2. Copy these libraries here “/home/{YourLinuxUsername}/Desktop/DSVM tools/spark/spark-2.0.2/jars/”
  3. Add their path to the list of library paths inside spark-defaults.conf as we have done before
  4. Go here and after you have created your AAD Application note down : Client ID, Client Secret and Tenant ID
  5. Add the following properties to your core-site.xml replacing the values with the ones you have obtained from the previous step:<property><name>dfs.adls.oauth2.access.token.provider.type</name><value>ClientCredential</value></property><property><name>dfs.adls.oauth2.refresh.url</name><value> https://login.microsoftonline.com/{YOUR TENANT ID}/oauth2/token</value></property><property><name>dfs.adls.oauth2.client.id</name><value>{YOUR CLIENT ID}</value></property>

    <property><name>dfs.adls.oauth2.credential</name><value>{YOUR SECRET ID}</value></property>

    <property><name>fs.adl.impl</name><value>org.apache.hadoop.fs.adl.AdlFileSystem</value></property>

    <property><name>fs.AbstractFileSystem.adl.impl</name><value>org.apache.hadoop.fs.adl.Adl</value></property>