/** This is the code for generating the three itemsets*/
val lines = sc.textFile("hdfs://master:9000/isom3370/product/browsingdata.txt")
val counts = lines.flatMap({case line => line.split(' ')}).map({case word => (word,1)}).reduceByKey(_+_).filter(m => m._2 >= 100).sortByKey().map(x => x._1)
val pairs = counts.cartesian(counts).filter(x => x._1 < x._2).sortByKey()
//leak 37.1 MB memory from org.apache.spark.util.collection.ExternalSorter@1404a662
val lines_combinations = lines.map(t => t.split(" ").sortWith(_ < _).combinations(2).toArray)
//Array(Array(Array(ELE17451, ELE89019),...
val all_combinations = lines_combinations.flatMap(x => x)
//Array(Array(ELE17451, ELE89019),...
val all_combinations2 = all_combinations.map({case Array(x1, x2) => (x1,x2)})
//Array((ELE17451,ELE89019),...
val all_combinations3 = all_combinations2.subtractByKey(pairs)
val all_combinations4 = all_combinations2.subtractByKey(all_combinations3)
val all_combinations5 = all_combinations4.map({case word => (word,1)}).reduceByKey(_+_).filter(m => m._2 >= 100).sortByKey().map(x => x._1)
val final_match_combinations = all_combinations5.filter(m => m._2 >= 100).sortByKey().map(x => x._1)
val triplet_combinations = final_match_combinations.cartesian(final_match_combinations).filter(x => ((x._1._1 == x._2._1 && x._1._2 < x._2._2) || (x._1._1 < x._2._1)))
val triplet_combinations_filter = triplet_combinations.filter(x => (x._1._1 == x._2._1))
val triplet_combinations_map = triplet_combinations_filter.map( x => (x._1._1, x._1._2, x._2._2))
val triple_lines_combinations = lines.map(t => t.split(" ").sortWith(_ < _).combinations(3).toArray)
val triple_lines_combinations2 = triple_lines_combinations.flatMap(x => x)
val triple_lines_combinations3 = triple_lines_combinations2.map({case Array(x1, x2, x3) => (x1,x2,x3)})
val sub_triplet = triple_lines_combinations3.subtract(triplet_combinations_map)
val triple_final = triple_lines_combinations3.subtract(sub_triplet)
val finish = triple_final.map({case word => (word,1)}).reduceByKey(_+_)
val confidence = foreach((finish._2).toInt/(counts._2).toInt).sortBy(._2)
confidence.saveAsTextFile("hdfs://master:9000/isom3370/product/output2")
//To access output files
//Hadoop setup
$ hadoop fs -mkdir -p /isom3370/product
$ hadoop fs -copyFromLocal browsingdata.txt /isom3370/product/
$ hadoop fs -ls /isom3370/product/browsingdata.txt
//Outpurfiles
$ hadoop fs -cat /isom3370/product/output2/part-00000
Be the first to comment
You can use [html][/html], [css][/css], [php][/php] and more to embed the code. Urls are automatically hyperlinked. Line breaks and paragraphs are automatically generated.