查询使用Apache Spark查询事件的Bigram频率
问题描述:
我想研究从搜索引擎查询日志中提取的会话中的用户操作。我定义了前两种动作:查询和Clics。查询使用Apache Spark查询事件的Bigram频率
sealed trait Action{}
case class Query(val input:String) extends Action
case class Click(val link:String) extends Action
假设在查询日志第一个动作是由以下时间戳以毫秒为单位:
val t0 = 1417444964686L // 2014-12-01 15:42:44
让我们定义一个语料库的时间有序关联到会话IDS行动。
val query_log:Array[(String, (Action, Long))] = Array (
("session1",(Query("query1"),t0)),
("session1",(Click("link1") ,t0+1000)),
("session1",(Click("link2") ,t0+2000)),
("session1",(Query("query2"),t0+3000)),
("session1",(Click("link3") ,t0+4000)),
("session2",(Query("query3"),t0+5000)),
("session2",(Click("link4") ,t0+6000)),
("session2",(Query("query4"),t0+7000)),
("session2",(Query("query5"),t0+8000)),
("session2",(Click("link5") ,t0+9000)),
("session2",(Click("link6") ,t0+10000)),
("session3",(Query("query6"),t0+11000))
)
与我们共创RDD这个quey_log:
import org.apache.spark.rdd.RDD
var logs:RDD[(String, (Action, Long))] = sc.makeRDD(query_log)
日志,然后通过会话ID
val sessions_groups:RDD[(String, Iterable[(Action, Long)])] = logs.groupByKey().cache()
现在,我们要研究一个会话中的行动共生组合,例如,会话中的重写次数。然后我们定义将从会话操作初始化的类共现。
case class Cooccurrences(
var numQueriesWithClicks:Int = 0,
var numQueries:Int = 0,
var numRewritings:Int = 0,
var numQueriesBeforeClicks:Int = 0
) {
// The cooccurrence object is initialized from a list of timestamped action in order to catch a session group
def initFromActions(actions:Iterable[(Action, Long)]) = {
// 30 seconds is the maximal time (in milliseconds) between two queries (q1, q2) to consider q2 is a rewririting of q1
var thirtySeconds = 30000
var hasClicked = false
var hasRewritten = false
// int the observed action sequence, we extract consecutives (sliding(2)) actions sorted by timestamps
// for each bigram in the sequence we want to count and modify the cooccurrence object
actions.toSeq.sortBy(_._2).sliding(2).foreach{
// case Seq(l0) => // session with only one Action
case Seq((e1:Click, t0)) => { // click without any query
numQueries = 0
}
case Seq((e1:Query, t0)) => { // query without any click
numQueries = 1
numQueriesBeforeClicks = 1
}
// case Seq(l0, l1) => // session with at least two Actions
case Seq((e1:Click, t0), (e2:Query, t1)) => { // a click followed by a query
if(! hasClicked)
numQueriesBeforeClicks = numQueries
hasClicked = true
}
case Seq((e1:Click, t0), (e2:Click, t1)) => { //two consecutives clics
if(! hasClicked)
numQueriesBeforeClicks = numQueries
hasClicked = true
}
case Seq((e1:Query, t0), (e2:Click, t1)) => { // a query followed by a click
numQueries += 1
if(! hasClicked)
numQueriesBeforeClicks = numQueries
hasClicked = true
numQueriesWithClicks +=1
}
case Seq((e1:Query, t0), (e2:Query, t1)) => { // two consecutives queries
val dt = t1 - t0
numQueries += 1
if(dt < thirtySeconds && e1.input != e2.input){
hasRewritten = true
numRewritings += 1
}
}
}
}
}
现在,让我们试着计算共生的RDD为每个会话:
val session_cooc_stats:RDD[Cooccurrences] = sessions_groups.map{
case (sessionId, actions) => {
var coocs = Cooccurrences()
coocs.initFromActions(actions)
coocs
}
}
不幸的是,它提出了以下MatchError
scala> session_cooc_stats.take(2)
15/02/06 22:50:08 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 4) scala.MatchError: List((Query(query3),1417444969686), (Click(link4),1417444970686)) (of class scala.collection.immutable.$colon$colon) at $line25.$read$$iwC$$iwC$Cooccurrences$$anonfun$initFromActions$2.apply(<console>:29)
at $line25.$read$$iwC$$iwC$Cooccurrences$$anonfun$initFromActions$2.apply(<console>:29)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at $line25.$read$$iwC$$iwC$Cooccurrences.initFromActions(<console>:29)
at $line28.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:31)
at $line28.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:28)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:1081)
at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:1081)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/02/06 22:50:08 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 4, localhost): scala.MatchError: List((Query(query3),1417444969686), (Click(link4),1417444970686)) (of class scala.collection.immutable.$colon$colon)
at $line25.$read$$iwC$$iwC$Cooccurrences$$anonfun$initFromActions$2.apply(<console>:29)
at $line25.$read$$iwC$$iwC$Cooccurrences$$anonfun$initFromActions$2.apply(<console>:29)
...
如果我建我自己的行动清单等同于session_cooc_stats中的第一组RDD
val actions:Iterable[(Action, Long)] = Array(
(Query("query1"),t0),
(Click("link1") ,t0+1000),
(Click("link2") ,t0+2000),
(Query("query2"),t0+3000),
(Click("link3") ,t0+4000)
)
我得到预期的结果
var c = Cooccurrences()
c.initFromActions(actions)
// c == Cooccurrences(2,2,0,1)
有什么不对劲,当我建立从RDD一个共生的对象。 它似乎链接到用groupByKey()构建的CompactBuffer。
缺什么?
我是Spark和Scala的新手。 预先感谢您的帮助。
Thomas
答
我在IntelliJ上设置了你的代码。
为Action,Query,Click和Coocurence创建一个类。
而你的代码放在主体上。
val sessions_groups:RDD[(String, Iterable[(Action, Long)])] = logs.groupByKey().cache()
val session_cooc_stats:RDD[Cooccurrences] = sessions_groups.map{
case (sessionId, actions) => {
val coocs = Cooccurrences()
coocs.initFromActions(actions)
coocs
}
}
session_cooc_stats.take(2).foreach(println(_))
刚刚修改VAR coocs> VAL coocs
我想这点。
共生(0,1,0,1)
共生(2,3,1,1)
答
正如你建议,我改写的IntelliJ用的代码并创建了一个同伴对象主要功能。令人惊讶的是,代码编译(使用sbt)并且运行完美无缺。
但是,我不明白为什么编译代码运行,而它不适用于spark-shell。
谢谢你的回答!
我只是插上它,它适用于我... – 2015-02-07 01:37:01
你真的可以在没有任何改变的情况下执行Spark指令'session_cooc_stats.take(2)'吗? 我仍然得到一个scala.MatchError列表((Query(query3),1417444969686),(Click(link4),1417444970686)) – 2015-02-07 08:03:44