Pyspark safely on Data Lake Store and Azure Storage Blob

Hi , I’m working on several projects where is required to access cloud storages (in this case Azure Data Lake Store and Azure Blob Storage) from pyspark running on Jupyter avoiding that all the Jupyter users are accessing these storages with the same credentials stored inside the core-site.xml configuration file of the Spark Cluster.

microsoft-azure-blob-e1483079067730

134d5808-d70b-4db2-97f8-1061211cd82f

I started my investigations looking at the SparkSession that comes with Spark 2.0, especially to commands like this spark.conf.set(“spark.sql.shuffle.partitions”, 6), but I discovered that this command are not working at Hadoop settings level, but they are limited to the spark runtime parameters.

I moved then my attention to SparkContext and in particular to HadoopConfiguration that seemed promising but it is missing into the pyspark implementation…

Finally I was able to find this excellent Stackoverflow post that points out how to leverage the HadoopConfiguration functionality from pyspark.

So in a nutshell you can have the core-site.xml defined as follows:

xml

So as you can see we do not store any credential here.

Let’s see how to access Azure Storage Blob Container with a shared access signature that can be created specifically to access a specific Container (imagine it like a folder) and set almost a fine grained security model on the  Azure Storage account without sharing the Azure Blob Storage Access Keys.

If you love python here some code that an admin can use to generate SAS signatures quickly that last for 24 hours:

from azure.storage.blob import (
BlockBlobService,
ContainerPermissions
)

from datetime import datetime, timedelta

account_name ="ACCOUNT_NAME"
account_key ="ADMIN_KEY"
CONTAINER_NAME="CONTAINER_NAME"

block_blob_service = BlockBlobService(account_name=account_name, account_key=account_key)

sas_url = block_blob_service.generate_container_shared_access_signature(CONTAINER_NAME,ContainerPermissions.READ,datetime.utcnow() + timedelta(hours=24),)

print(sas_url)

You will obtain something like this:

sv=2015-04-05&st=2015-04-29T22%3A18%3A26Z&se=2015-04-30T02%3A23%3A26Z&sr=b&sp=rw&sip=168.1.5.60-168.1.5.70&spr=https&sig=Z%2FRHIX5Xcg0Mq2rqI3OlWTjEg2tYkboXr1P9ZUXDtkk%3D

You can refer to this link to understand the structure.

Ok now, once the azure storage admin provide us the signature, we can use this SAS signature to access directly the files on the Azure Storage Blob Container safely:

 
 sc._jsc.hadoopConfiguration().set("fs.azure.sas.PUT_YOUR_CONTAINER_NAME.PUT_YOUR_ACCOUNT_NAME.blob.core.windows.net", "PUT_YOUR_SIGNATURE")
 from pyspark.sql.types import *

# Load the data.We use the sample HVAC.csv file of HDInsight samples
 hvacText = sc.textFile("wasbs://PUT_YOUR_CONTAINER_NAME@PUT_YOUR_ACCOUNT_NAME.blob.core.windows.net/HVAC.csv")

# Create the schema
 hvacSchema = StructType([StructField("date", StringType(), False),StructField("time", StringType(), False),StructField("targettemp", IntegerType(), False),StructField("actualtemp", IntegerType(), False),StructField("buildingID", StringType(), False)])

# Parse the data in hvacText
 hvac = hvacText.map(lambda s: s.split(",")).filter(lambda s: s[0] != "Date").map(lambda s:(str(s[0]), str(s[1]), int(s[2]), int(s[3]), str(s[6]) ))

# Create a data frame
 hvacdf = sqlContext.createDataFrame(hvac,hvacSchema)

# Register the data fram as a table to run queries against
 hvacdf.registerTempTable("hvac")
 from pyspark.sql import HiveContext
 hive_context = HiveContext(sc)
 bank = hive_context.table("hvac")
 bank.show()

The same idea can be applied to data lake store. Assuming that you have your data lake credentials setup as described here , you can access data lake store safely in this way:

 
sc._jsc.hadoopConfiguration().set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/PUT_YOUR_TENANT_ID/oauth2/token")
sc._jsc.hadoopConfiguration().set("dfs.adls.oauth2.client.id", "PUT_YOUR_CLIENT_ID")
sc._jsc.hadoopConfiguration().set("dfs.adls.oauth2.credential", "PUT_YOUR_SECRET")


  from pyspark.sql.types import *

# Load the data. The path below assumes Data Lake Store is default storage for the Spark cluster
  hvacText = sc.textFile("adl://YOURDATALAKEACCOUNT.azuredatalakestore.net/Samples/Data/HVAC.csv")

# Create the schema
  hvacSchema = StructType([StructField("date", StringType(), False),StructField("time", StringType(), False),StructField("targettemp", IntegerType(), False),StructField("actualtemp", IntegerType(), False),StructField("buildingID", StringType(), False)])

  # Parse the data in hvacText
  hvac = hvacText.map(lambda s: s.split(",")).filter(lambda s: s[0] != "Date").map(lambda s:(str(s[0]), str(s[1]), int(s[2]), int(s[3]), str(s[6]) ))

  # Create a data frame
  hvacdf = sqlContext.createDataFrame(hvac,hvacSchema)

  # Register the data fram as a table to run queries against
  hvacdf.registerTempTable("hvac")

    from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
bank = hive_context.table("hvac")
bank.show()

Happy coding with pyspark and Azure!

Cross-channel personalization with Adobe Marketing Cloud

This is a sample scenario that we want to reproduce:

Meet John. John wants to buy a new car. During his lunch break he uses his iPad to look at a manufacturer website and finds a model he is interested in.  He engages with the content but then his lunch break ends! He leaves the site and continues with his working day.

The following weekend, with more time on his hands, John returns to the website.  John is recognised as a returning visitor and the data held on him is interrogated. It is known that he did not sign up for the manufacturers newsletter on his previous visit so the landing page is personalised to prompt him to sign up.  Also, the vehicle that he was interested in is given prominence on the site.  John follows the prompts and signs up to the newsletter.

One week later John receives an e-mail, but he is busy with his work and he doesn’t open the e-mail before the evening. By this time he is really tired and doesn’t click through to the website from the newsletter. He puts down his iPad and goes to bed.

Based on John’s previous behaviour, the next newsletter is sent to him on the evening. John opens it and as it focuses on the vehicle he has already expressed an interest in, he clicks through to the website. The website recognises him and displays information about where and when he can test drive his preferred vehicle, along with an incentive to do so. John follows this link and books a test drive for the next day.

Source : https://www.cognifide.com/blogs/marketing-technology/cross-channel-personalization-with-adobe-marketing-cloud/#.Vk2HvGOFO70

 

These are the steps to perform all the actions described inside the scenario:

  1. https://www.cognifide.com/blogs/technology/zen-journey—a-technical-view-on-the-adobe-marketing-cloud—part-1/#.Vk2IP2OFO70
  2. https://www.cognifide.com/blogs/technology/zen-journey—a-technical-view-on-the-adobe-marketing-cloud—part-2/#.Vk2IUGOFO70
  3. https://www.cognifide.com/blogs/technology/zen-journey—a-technical-view-on-the-adobe-marketing-cloud—part-3/#.Vk2IXWOFO70

Indeed all the integrations are not super simple and they strongly leverage API calls between the various modules, but now you have an idea on how to perform such personalization.

 

Indexing Xml files with elastic search and c#

Lately I’ve been struggling with some integration issues and each time I had to reverse engineer workflows, troubleshoot code and search inside the logs of the enterprise service bus to find the message xyz containing the costumer data of client abc. Now since these logs are also stored in an encrypted format, I had to write some code to decrypt them on the fly , search inside file contents, look inside next log etc…

Basically one single search of one costumer was taking 20-30 min…

So I started to look at solutions like elastic search or SOLR that solve exactly these kind of problems and since I already worked with elastic search in the past I went in that direction. The classic combo is Logtash & Elastic Search & Kibana: Logtash is used to parse and transform the incoming log files and send them to Elastic search, here the are indexed and using Kibana you can quickly build nice dashboards on the indexed content.

This time however I had to face a new challenge,  instead of having classic web logs and ready to use logtash transformations (filters) , I had to work on these huge xml log files stored inside the ESB and they also had several levels of nesting. Elastic search supports natively json objects indexing and not xml so you have to manipulate the xml with a logtash transformation. After reading a bit about logtash xml filter I found that (probably because I did not spend so much time on this) it would take too much time to write the right transformation for my case.

So I started to code some c# code to do it and I choose to leverage the NEST library (elastic search .net client).

While looking inside  Nest and elastic search documentation I discovered also that objects nested inside other objects are not so easy searchable like the root ones. So I decided to flatten the xml structure into a flat c# simple class. To have the write the minimum amount of code to do this, I first transformed the xml into a proper c# class , the fastest way I found is to use xsd.exe from windows tool kit (look in C:\Program Files\Microsoft SDKs\Windows\v6.0A\bin) and obtain the xsd file from a single xml document:

xsd “C:\Users\UserA\Desktop\ESB.xml” /o:”C:\Users\UserA\Desktop”

You will obtain ‘C:\Users\UserA\Desktop\ESB.xsd’.

Now use xsd.exe again to generate the c# class:

xsd /c “C:\Users\UserA\Desktop\ESB.xsd” /o:”C:\Users\UserA\Desktop”

You will obtain ‘C:\Users\UserA\Desktop\ESB.cs’.

I manually  created the flat c# class simply coping & pasting the generated c# class nested object properties inside the flat one. Since property names are not changed using reflection we can later automatically copy property values from the nested objects to the flat one:

public static void ReplaceValues(Object source, Object destination)
{
PropertyInfo[] propertiesIncoming = source.GetType().GetProperties(BindingFlags.Public | BindingFlags.Instance);
PropertyInfo[] propertiesDestination = destination.GetType().GetProperties(BindingFlags.Public | BindingFlags.Instance);
//This is a sample code, do not iterate like this 
//and use linq search with large collections!!!
foreach (PropertyInfo p in propertiesIncoming){                
      if (p.PropertyType != typeof(string)) { continue; }
      PropertyInfo dest = propertiesDestination.Where(y => y.Name == p.Name).FirstOrDefault();
      if (dest!=null)
      {
         dest.SetValue(destination, p.GetValue(source));
      }                
   }
}

So once we have the flat c# object using NEST client we can index it very quickly,
here a sample that takes one xml and indexes the xml contents.

//ESBObj is the ESB.cs class type     
XmlSerializer serializer = new XmlSerializer(typeof(ESBObj));
//Even with large xml files this deserialization happens really quickly
                ESBObj resultingMessage = (ESBObj)serializer.Deserialize(new XmlTextReader(this.openFileDialog1.FileName));
//Here we use the NEST library and we connect to the local node
                var node = new Uri("http://localhost:9200");
//we specify that we want to work on the esb_index
                var settings = new ConnectionSettings(
                    node,
                    defaultIndex: "esb_index"
                );
//let's connect 
                var client = new ElasticClient(settings);

//here we fill the flat objects using the ESBObj levels               
                FlatObj tempObj=null;
                int progressive = 0;
//sample code here , this can be largely improved using reflection again 
                foreach (var level1 in resultingMessage.Level1Objects)
                {                  
                    foreach (var level2 in level1.Level2Objects)
                    {
                        foreach (var level3 in level2.Level3Objects)
                        {
                            tempObj = new FlatObj();
                            progressive++;
                            ReplaceValues(resultingMessage, tempObj);
                            ReplaceValues(level1, tempObj);
                            ReplaceValues(level2, tempObj);                          
                            ReplaceValues(level3, tempObj);
//Here before indexing we assign a progressive Id to each object
//in order to have unique id on elastic search
//elastic search uses this id to identify uniquely each object 
//on the index
                            tempObj.Id = progressive.ToString();
//This is the indexing call
                            var index = client.Index(tempObj);
                        }
                    }
                }

Now we want search for contents on the index that stores these contents,however this happened to be more tricky of what I thought ,probably because it was the first time for me using the NEST library, but luckily I had installed also some elastic search plug-ins and one of these was ElasticHQ , a nice front-end for elastic search. Looking inside the JSON requests of the queries done by ElasticHQ I was able to find the right query to issue using NEST raw mode (where you pass directly the commands avoiding NEST library to do it for you).

This is some sample code that “should” work , a search with City=New York but in my case no results..

var searchResults = client.Search<FlatObj>(s => s
                .From(0)
                .Size(50)
                .Query(q => q
                     .Term(p => p.City, "New York")
                )
                );

Here instead how I make it work (and in this way searches automatically on all the properties!):


//In searchbox we type what we want to find 
//we can type here anything and elastic search will search on all 
//flattened properties!!!
string searchVal = @"{""filtered"": {""query"": {""query_string"": {""query"": """ + this.searchbox.Text;
searchVal=searchVal+@"""}}}}";
var client = new ElasticClient(settings);
var searchResults = client.Search<FlatObj>(s => s
            .From(0)
            .Size(50)            
            .QueryRaw(searchVal)           
            );
//Here since Document collections is IEnumerable we can bind it on the fly
// and see the results on a grid!
this.dataGridView1.DataSource = searchResults.Documents;

So in the end I ended up my quick POC indexing a 14 Mb xml log file in 200 ms and searching every possible content of it in 50-100 ms for each search issued to elastic search node. Actually only the index size scares me (5 Mb), and I want to see how much it will grow with several files.

Marketing Segmentation & Predictive Analytics

Enterprises at any level need to target their consumers, clients, users with campaigns , measure the result of these campaigns and hopefully improve sales/contacts after each iteration. Practically this job requires a very broad spectrum of experiences starting from web/art/video design (how we format our message), going to language specific skills (how we write our message to raise interest) and of course some black magic art called segmentation (write the right message to the right potential or actual costumer). How marketing people build segments? Well using the attributes of the customers (age, sex, zip code, children etc…) they can create segments (for example all young women living in NY without children) and create campaigns for them (discounted shirts? Why not! ) . Now what is the role of predictive analytics in this area? Well in theory it should help marketing people really a lot: 

  • Discovering clusters (segments?) into the customer base
  • Identifying the key features or influencers that lead to buy an item or do some action
  • Showing what are the items  bought in combo and propose them as new packages to offer
  • Using social data identify leaders and followers into the customer base
  • ….surely  another hundred of insights like the ones mentioned.

Normally a marketing targeting tool leverages a classical database with tables, fields , records and actually the segmentation result it’s “just” a sql query that with some “where” conditions identifies the impacted customers. This tool it’s usually part of a suite, in other words from a vendor you buy a package with it and the package is made usually also of a CMS to build your web site pages, a eCommerce platform to sell your stuff on the net, an analytics/predictive analytics package to do the tasks described before. 

Looking at this , from a pure marketing department perspective, to maximize the value of your investments you should buy the package and enjoy the entire integrated platform without worries. It’s a bit like when you buy an iPhone and you start to enjoy it really when you couple it with an Apple TV, a MacBook and lately an apple watch. 

But while you can be the only one at home that takes this decision in medium/large enterprises you have several good reasons to do exactly the opposite:

1) predictive analytics runs on the largest possible combinations of data not only on the pure marketing/clicks/orders world.

2) it requires tools (big data clusters, in memory engines, complex algorithms, etc..) that are much more sophisticated  than the “normal” analytics capabilities provided by mkt packages . Usually it leverages best in class open source (R,Python,Scala libraries etc..) or specific vendor software (SAS, IBM,etc.. and lately several startups like Ayasdi etc..).

3) people working on it are miles away from marketing people from cab abilities perspective and from a target perspective (mkt usually want analytics to prove its guts feeling, analyst looks at data with curiosity trying to figure out patterns) 

4) From an analytics stand point  usually we want to buy software that we can reuse for all the business cases (supply chain, logistics, operations, etc…) and not only for the marketing business cases.

The usual “attempt to solve” this conflict it’s separation of diuties : analysts discover insights, they translate these insights into features of the customers and this info “should” fly to the segmentation tool and the good mkt guys should leverage these to do super effective campaigns. The result ? A huge amount of effort wasted without any ROI: 

1) mkt people do not own these features so they do not trust them or they simply ignore them

2) when you start the customization journey ( new interfaces that trasport data back & forth from analytics world to mkt planet) you will face bugs, data quality issues, data synchronization issues etc…

3) analyst try to cover multiple business cases as said, so only a fraction of his job it’s actually targeting marketing needs

When you are a startup however all the complications of this approach are mitigated by the fact that your team is small and mkt/analytics teams are in practice 2-3 people in the same room.

So we can apply the startup model everywhere?

The key factor here it’s people and objectives. We can go with a complete E2E marketing & predictive analytics integrated and customized solution but we have to add to it a dedicated team that works on it : creative marketing and data driven decisions can coexist and actually help each other if they are the result of a team of people laser focused on the same business objectives.

For large enterprises building this team is the key to provide  these capabilities as service to the various internal clients. Clients can be for example other marketing departments spread around the world in different countries working on different labels. These clients will benefit of this because they can focus really on their business at regional level and obtain not only an “agency like” service but they will benefit from the entire chain of experiences and results that data flowing at global level can bring.