Research talk:Measuring edit productivity/Work log/2015-05-27

Wednesday, May 27, 2015

edit

Work on Altiscale cluster

JSON Metadata Spark Job

edit

Generated using /home/joal/refinery-job.jar:

/opt/spark/bin/spark-submit --verbose --master yarn --deploy-mode client \
--driver-memory 2G --executor-memory 2G --executor-cores 1 --num-executors 64 \
--driver-class-path $(find /opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo-* | head -n 1) \
--class org.wikimedia.analytics.refinery.job.MediaWikiDumpXMLToJSONMetadata \
 /home/joal/refinery-job.jar /user/halfak/streaming/enwiki-20150304/xml-bz2 /user/joal/enwiki_20150304_json_metadata

JSON Data stored in 500 files (about 128Mb each) at /user/joal/enwiki_20150304_json_metadata. It contains one json line per revision, with limited set of fields: id, parent_id, timestamp, page, contributor, minor, bytes and a special field called prev_rev containing the id and timestamp of the previously parsed revision (if in the same page) in the xml order. Reducing information to that level allows us to generate interesting statistics about enwiki dump using spark with reasonnable execution time.

Spark Shell

edit

Launch spark-shell having refinery-job.jar in the classpath to take advantage of json libraries:

/opt/spark/bin/spark-shell --master yarn --num-executors 64 --executor-memory 4G \
 --driver-class-path $(find /opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo-* | head -n 1) --jars /home/joal/refinery-job.jar

This launches a HEAVY spark-shellinstance (64 workers, 4G ram each). Please tweak --num-executors 64 --executor-memory 4G parameters to adjust.

Spark Analysis

edit

Import some usefull spark and json libraries:

import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD

Load revs value as parsed metadata files, caching it for faster usage:

val revs = sc.textFile("/user/joal/enwiki_20150304_json_metadata").map(parse(_)).cache()

And now get some results:

// Revisions count
revs.count
Long = 599684931

// Unique pages count
revs.map(v => (v \ "page" \ "id").asInstanceOf[JInt].num.toLong).distinct.count() 
Long = 35332741

/*****************************************/

// Function printing log10 base distribution of a pair rdd
// We assume the pair to contain an id (long) and a long value.
def printlog10Dist(rdd: RDD[(Long, Long)]): Unit = {
  rdd.
    map(p => (math.log10(p._2).toInt, 1L)).
    aggregateByKey(0L)((v1, v2) => v1 + v2, (v1, v2) => v1 + v2).
    collect.foreach(println)
}

/*****************************************/

// Revisions per page
val revsPerPage = revs.
  map(v => ((v \ "page" \ "id").asInstanceOf[JInt].num.toLong, 1L)).
  aggregateByKey(0L)((v1, v2) => v1 + v2, (v1, v2) => v1 + v2)

// Revisions per page distribution, log10 base
printlog10Dist(revsPerPage)
(0,29051606)
(1,5358977)
(2,859891)
(3,61327)
(4,908)
(5,31)
(6,1)

// Top 32 pages by number of revisions (pageId, num_revisions)
revsPerPage.sortBy(p => p._2, ascending=false).take(32).foreach(println)
(1952670,1058630)
(5137507,821551)
(13784401,615671)
(16283969,582865)
(2535910,388984)
(352651,341462)
(972034,329975)
(2189161,326340)
(564696,326107)
(16927404,320463)
(2535875,308040)
(40297,282846)
(5149102,254172)
(11022716,253033)
(32101143,188999)
(2515121,170923)
(31530695,169290)
(6041086,161979)
(1470141,157437)
(12056747,153411)
(11238105,148006)
(1226609,145709)
(4626266,143302)
(8993207,141341)
(3252662,133308)
(3741656,133192)
(5030553,129121)
(3514978,119653)
(11005908,116607)
(9870625,115515)
(7535778,112431)
(6768170,106106)

/*****************************************/

// SUM(Bytes) per page
val sumBytesPerPage = revs.
  map(v => ((v \ "page" \ "id").asInstanceOf[JInt].num.toLong, (v \ "bytes").asInstanceOf[JInt].num.toLong)).
  aggregateByKey(0L)((v1, v2) => v1 + v2, (v1, v2) => v1 + v2)

// SUM(Bytes) per page distribution, log10 base
printlog10Dist(sumBytesPerPage)
(-2147483648,2577)
(0,103907)
(1,9381750)
(2,8964865)
(3,9582612)
(4,4477007)
(5,2072308)
(6,599491)
(7,131373)
(8,16307)
(9,520)
(10,23)
(11,1)

// Top XX pages by SUM(Bytes) (pageId, sum_bytes)
sumBytesPerPage.sortBy(p => p._2, ascending=false).take(24).foreach(println)
(5137507,255748072182)
(2535910,60746530013)
(972034,60114973986)
(5149102,43014476215)
(2535875,36922541407)
(564696,35003941405)
(40297,32445227518)
(11424955,22193085230)
(32101143,21488079051)
(3706897,18549825060)
(1470141,17916266438)
(986140,17793871027)
(3252662,17645815648)
(16927404,17511855999)
(6768170,16721293033)
(3741656,13512581014)
(75321,12517129056)
(2515121,11889235356)
(6041086,11859473845)
(9870625,11845661846)
(6905700,11410275184)
(17820752,11347901892)
(10701605,10671585654)
(3514978,10141543074)

/*****************************************/

// Bytes per revision
val bytesPerRev = revs.
  map(v => ((v \ "id").asInstanceOf[JInt].num.toLong, (v \ "bytes").asInstanceOf[JInt].num.toLong)).
  aggregateByKey(0L)((v1, v2) => v1 + v2, (v1, v2) => v1 + v2)

// Bytes per revision distribution, log10 base
printlog10Dist(bytesPerRev)
(-2147483648,1348990)
(0,666045)
(1,28060977)
(2,58117910)
(3,263271820)
(4,229162391)
(5,19020550)
(6,36243)
(7,5)

// Top XX revisions by bytes
bytesPerRev.sortBy(p => p._2, ascending=false).take(5).foreach(println)
(35775401,10597086)
(39456244,10369813)
(26720552,10245346)
(35891845,10241883)
(21299582,10112256)

/*****************************************/

// Unordered revisions 
val unord_revs = revs.filter(json => (((json \ "prev_rev") != JNothing)
                        && ((json \ "timestamp").asInstanceOf[JString].s < (json \ "prev_rev" \ "timestamp").asInstanceOf[JString].s)))

unord_revs.count()
Long = 51411

unord_revs.map(v => (v \ "page" \ "id").asInstanceOf[JInt].num.toLong).distinct.count()
Long = 36176


Next things to do

edit
  • Deeper analysis on unordered revisions (big / small pages, many revisions in between unordered ones ...)
  • Same analysis with subset having namespace = 0 (articles)
val articles = sc.textFile("/user/joal/enwiki_20150304_json_metadata").map(parse(_)).
                  filter(j => (j \ "page" \ "namespace").asInstanceOf[JInt].num.toLong == 0L).cache()

JAllemandou (WMF) (talk | contribs) 20:52, 27 May 2015‎
signed by Halfak (WMF) (talk) 06:44, 28 May 2015 (UTC)Reply

Return to "Measuring edit productivity/Work log/2015-05-27" page.