Skip to content

Latest commit

 

History

History
347 lines (289 loc) · 13 KB

README.md

File metadata and controls

347 lines (289 loc) · 13 KB

KeywordAnalysis

Word analysis, by domain, on the Common Crawl data set for the purpose of finding industry trends

Knowledge Sharing Interview: Using Open Data to Predict Market Movements https://youtu.be/qjlOMoAYKmg?t=134

Knowledge Sharing Paper: Using Open Data to Predict Market Movements https://education.emc.com/content/dam/dell-emc/documents/en-us/2017KS_Ravinder-Using_Open_Data_to_Predict_Market_Movements.pdf


Process

Specific Domain Data Capturing

Common Crawl NetApp data capturing (New Index - after 2013)

  1. Start one EC2 m4.xlarge instance with 30GB SSD volume
  2. SSH to the instance with user: ec2-user
sudo yum -y install git; 
git clone https://github.com/CI-Research/cdx-index-client
cd cdx-index-client
sudo pip install -r requirements.txt
./cdx-index-client.py -c CC-MAIN-2016-30 *.netapp.com --json
cat domain-* > CC-MAIN-2016-30_July_Netapp
cd ~
git clone https://github.com/CI-Research/CommonCrawlDocumentDownload
cd CommonCrawlDocumentDownload
sudo yum install java-1.8.0-openjdk-devel
sudo update-alternatives --config java
./gradlew check
cp ~/cdx-index-client/CC-MAIN-2016-30_July_Netapp ~/CommonCrawlDocumentDownload
mv CC-MAIN-2016-30_July_Netapp commoncrawl-CC-MAIN.txt
nohup ./gradlew downloadDocuments
  1. Uploade data to S3
cd ~/download
aws configure
aws s3 sync . s3://CommonCrawl/data/NetApp/CC-MAIN-2016-30_July_Netapp/

Wget NetApp data capturing (This is optional)

Run wget from laptop Linux virtual machine

wget -r -nc -np "http://www.netapp.com/"
FINISHED --2017-04-28 05:03:37--
Total wall clock time: 9h 14m 12s
Downloaded: 10255 files, 1.1G in 4h 51m 33s (67.6 KB/s)
after zip/unzip: 10,000 files, 1,429 folders, 1.11GB
zip -r NetApp_April_2017 www.netapp.com/
find . -type f -exec cat {} + > Netapp_April_2017.txt

Upload files to AWS S3 bucket for use.

Remove html tags

  1. Start 1 nodes AWS EMR (Advance config: Hadoop only, Network "EC2-Classic", Master "m1.large", Core "0")
  2. SSH to the instance: ec2-54-90-80-85.compute-1.amazonaws.com (change) user: hadoop
  3. sudo yum install -y git
  4. wget http://www-eu.apache.org/dist/maven/maven-3/3.5.2/binaries/apache-maven-3.5.2-bin.tar.gz
  5. tar zxvf apache-maven-3.5.2-bin.tar.gz
  6. sudo vi .bashrc
export MAVEN_HOME=/home/hadoop/apache-maven-3.5.2
export M2_HOME=/home/hadoop/apache-maven-3.5.2
export M2=/home/hadoop/apache-maven-3.5.2
export PATH=/home/hadoop/apache-maven-3.5.2/bin:$PATH
  1. source .bashrc
  2. git clone https://github.com/dkpro/dkpro-c4corpus
  3. aws s3 sync s3://CommonCrawl/data/NetApp/CC-MAIN-2016-30_July_Netapp/ /var/tmp/CC-MAIN-2016-30_July_Netapp 9a. aws s3 sync s3://CommonCrawl/data/StackOverflow/ /var/tmp/StackOverflow/
  4. run the dkpro-c4corpus-boilerplate project using the below 2 commands::
cd dkpro-c4corpus/dkpro-c4corpus-boilerplate/
mvn package
  1. mkdir /var/tmp/boiler
  2. Create script to process the file:: (replace name of the directory CC-MAIN to the one you are working on). For StackOverflow i used "/var/tmp/StackOverflow/*;"
vi boiler.sh
#!/bin/bash
for filename in /var/tmp/CC-MAIN*/*; do
    java -jar target/dkpro-c4corpus-boilerplate-1.0.1-SNAPSHOT.jar "$filename" "/var/tmp/boiler/$(basename "$filename" .txt)" false  
done
  1. chmod +x boiler.sh
  2. nohup ./boiler.sh
  3. cd /var/tmp/boiler
  4. aws s3 sync . s3://CommonCrawl/boilerplate/netapp/CC-MAIN-2016-30_July_Netapp/

Wordcount process

spark-shell

IBM Wordcount process:

val file = sc.textFile("s3://CommonCrawl/ibm_boiler")
val counts = file.flatMap(line => line.toLowerCase().replace(".", " ").replace(",", " ").split(" ")).map(word => (word, 1L)).reduceByKey(_ + _)
val sorted_counts = counts.collect().sortBy(wc => -wc._2)   // 1mins
sc.parallelize(sorted_counts.take(60000)).saveAsTextFile("s3://CommonCrawl/boilerplate/ibm_boiler _top60000")
sc.parallelize(sorted_counts).saveAsTextFile("s3://CommonCrawl/boilerplate/wordcount-ibm_bolier")

Netapp Wordcount process:

val file = sc.textFile("s3://CommonCrawl/boilerplate/netapp_boiler")
val counts = file.flatMap(line => line.toLowerCase().replace(".", " ").replace(",", " ").split(" ")).map(word => (word, 1L)).reduceByKey(_ + _)
val sorted_counts = counts.collect().sortBy(wc => -wc._2)    // 3mins
sc.parallelize(sorted_counts.take(20000)).saveAsTextFile("s3://CommonCrawl/top20000_netapp_boiler")
sc.parallelize(sorted_counts).saveAsTextFile("s3://CommonCrawl/wordcount-netapp_boiler")

Top 10 words

Word Count
4327791
the 2103578
0 1159355
to 1097568
and 1057336
a 856529
of 811647
for 737729
in 646580
ibm 623663

Dataframe, Dataset, Data source

Convert Text to Parquet, Spark 2.0 convert into parquet file in much more efficient than spark1.6.

[hadoop@ip-10-0-1-27 ~]$ aws s3 cp s3://CommonCrawl/netapp_boiler_top20000_np.csv /var/tmp
[hadoop@ip-10-0-1-27 ~]$ hdfs dfs -put /var/tmp/netapp_boiler_top20000_np.csv /user/hadoop/

Spark 1.4+

spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/user/hadoop/netapp_boiler_top20000_np.csv")
val selectedData = df.select("words", "count")
selectedData.write.format("com.databricks.spark.csv").option("header", "true").save("netappparquet.csv")

Remove Stop Words

The steps are

  1. remove punctuation, by replace "[^A-Za-z0-9\s]+" with "", or not include numbers "[^A-Za-z\s]+"
  2. trim all spaces
  3. lower all words
  4. remove stop words
aws s3 cp s3://CommonCrawl/netapp_boiler_top20000.txt /var/tmp
hdfs dfs -mkdir /user/hadoop/data/
hdfs dfs -put /var/tmp/netapp_boiler_top20000.txt /user/hadoop/data/
spark-shell
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.sql.functions.split

// val reg = raw"[^A-Za-z0-9\s]+" // remove punctuation with numbers

val reg = raw"[^A-Za-z\s]+" // remove punctuation not include numbers
val lines = sc.textFile("/user/hadoop/netapp_boiler_top20000_np.csv").map(_.replaceAll(reg, "").trim.toLowerCase).toDF("line")
val words = lines.select(split($"line", " ").alias("words"))
val remover = new StopWordsRemover().setInputCol("words").setOutputCol("filtered")
val noStopWords = remover.transform(words)
remover.transform(words).show(15)

alt text


//val counts = noStopWords.select(explode($"filtered")).map(word =>(word, 1)).reduceByKey(_+_)

// from word -> num to num -> word
//val mostCommon = counts.map(p => (p._2, p._1)).sortByKey(false, 1)

//mostCommon.take(5)

//dataframe dump to csv
val stringify = udf((vs: Seq[String]) => s"""[${vs.mkString(",")}]""")
words.withColumn("words", stringify($"words")).write.csv("/netapp_filtered.csv")
hdfs dfs -get /netapp_filtered.csv .

Machine Learning Pipeline: TF-IDF and K-Means

Introducing the TF-IDF method for vectorizing a "bag of words"

TF: "Term Frequency"

  • normalized for the length of the document
  • hashed into a fixed-length set of buckets ("the hashing trick") so that we don't have an extremely high number of dimensions (count of all distinct tokens)
  • downside: there will be some hash collisions, where unrelated words get mapped to the same "dimension"

IDF: "Inverse Document Frequency"

  • Normalize word counts based on how frequently a word occurs in the corpus.
  • Logarithmic transformation so that words which occur in literally every document (100% or 1.0) get weighted down to 0 (ln 1)
  • Rare words are weighted heavily
  • Helpful where rare, technical vocabulary constitutes distinguishing features

In spark 2.0, Spark has made csv a built-in source. We can create Dataframes from csv file.

sudo yum install -y git

git clone https://github.com/phatak-dev/spark-two-migration

aws s3 cp s3://CommonCrawl/netapp_boiler_top20000_np.csv /var/tmp

hdfs dfs -put /var/tmp/netapp_boiler_top20000_np.csv /user/hadoop/

spark-shell

import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.sql.functions.split
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val sqlContext = new SQLContext(sc)
val netappDF = sqlContext.read.format("csv").option("header", "true").load("netapp_boiler_top20000_np.csv")
netappDF.columns
netappDF.show(15)
netappDF.printSchema()
netappDF.select("count").show()
netappDF.select($"words", $"count").show()
netappDF.filter($"count" > 10000).show()
netappDF.groupBy("count").count().show()
netappDF.groupBy("words").count().show()
//try sql query to display specific word
//netappDF.createOrReplaceTempView("netappsql")
//val sqlDF = spark.sql("SELECT words, count FROM netappsql WHERE words = 'database'".show(20) 

Lower case the text:

val netappLoweredDF = netappDF.select($"*", lower($"words").as("lowerText"))
netappLoweredDF.show(2)

Set up the ML Pipeline:

import org.apache.spark.ml.feature.{RegexTokenizer, StopWordsRemover, HashingTF, IDF, Normalizer}

Step 1: Natural Language Processing: RegexTokenizer: Convert the lowerText col to a bag of words

val tokenizer = new RegexTokenizer().setInputCol("lowerText").setOutputCol("netappwords").setPattern("""\W+""")
val netappWordsDF = tokenizer.transform(netappLoweredDF.na.drop(Array("lowerText")))
netappWordsDF.printSchema
netappWordsDF.select("netappwords").first

Step 2: Natural Language Processing: StopWordsRemover: Remove Stop words

val remover = new StopWordsRemover().setInputCol("netappwords").setOutputCol("noStopWords")
val noStopWordsListDF = remover.transform(netappWordsDF)
noStopWordsListDF.printSchema
noStopWordsListDF.select("words", "count", "netappwords", "noStopWords").show(20)
noStopWordsListDF.show(15)

alt text

alt text

Step 3: HashingTF// More features = more complexity and computational time and accuracy

val hashingTF = new HashingTF().setInputCol("noStopWords").setOutputCol("hashingTF").setNumFeatures(20000)
val featurizedDataDF = hashingTF.transform(noStopWordsListDF)
featurizedDataDF.printSchema
featurizedDataDF.select("words", "count", "netappwords", "noStopWords").show(7)

Step 4: IDF// This will take 30 seconds or so to run

val idf = new IDF().setInputCol("hashingTF").setOutputCol("idf")
val idfModel = idf.fit(featurizedDataDF)

Step 5: Normalizer// A normalizer is a common operation for text classification. // It simply gets all of the data on the same scale... for example, if one article is much longer and another, it'll normalize the scales for the different features. // If we don't normalize, an article with more words would be weighted differently

val normalizer = new Normalizer().setInputCol("idf").setOutputCol("features")

Step 6: k-means & tie it all together...

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans().setFeaturesCol("features").setPredictionCol("prediction").setK(8).setSeed(0) 
// for reproducability
val pipeline = new Pipeline().setStages(Array(tokenizer, remover, hashingTF, idf, normalizer, kmeans))
// This can take more 1 hour to run!/*
val model = pipeline.fit(netappLoweredDF.na.drop(Array("lowerText")))
*/

Prediction

aws s3 cp s3://CommonCrawl/ibm_boiler_top60000.csv /var/tmp

hdfs dfs -put /var/tmp/ibm_boiler_top60000.csv /user/hadoop/

// val model2 = org.apache.spark.ml.PipelineModel.load("netapp_boiler_top20000_np.csv")
// val model2 = org.apache.spark.ml.PipelineModel.load("saves_parquet.csv")
// input path error

Let's take a look at a sample of the data to see if we can see a pattern between predicted clusters and titles.

val rawPredictionsDF = model.transform(netappLoweredDF.na.drop(Array("lowerText")))
rawPredictionsDF.columns
rawPredictionsDF.show(10)
val predictionsDF = rawPredictionsDF.select($"words", $"prediction").cache
predictionsDF.show(15)
// This could take up to 5 minutes.predictionsDF.groupBy("prediction").count().orderBy($"count" desc).show(100)
display(predictionsDF.filter("prediction = 3").select("words", "prediction").limit(30))
display(predictionsDF.filter("prediction = 4").select("words", "prediction").limit(30))
display(predictionsDF.filter("prediction = 2").select("words", "prediction").limit(30))
predictionsDF.filter($"title" === "Apache Spark").show(10)
display(predictionsDF.filter("prediction = 25").limit(25))

alt text

alt text

alt text