微信搜索bigdata029 | 邀请体验:数阅–数据管理、OLAP分析与可视化平台 | 订阅本站 | 赞助作者:赞助作者

Spark MLlib实现的中文文本分类–Naive Bayes

Spark lxw1234@qq.com 18345℃ 26评论

关键字:spark mllib、文本分类、朴素贝叶斯、naive bayes

文本分类是指将一篇文章归到事先定义好的某一类或者某几类,在数据平台的一个典型的应用场景是,通过爬取用户浏览过的页面内容,识别出用户的浏览偏好,从而丰富该用户的画像。
本文介绍使用Spark MLlib提供的朴素贝叶斯(Naive Bayes)算法,完成对中文文本的分类过程。主要包括中文分词、文本表示(TF-IDF)、模型训练、分类预测等。

中文分词

对于中文文本分类而言,需要先对文章进行分词,我使用的是IKAnalyzer中文分析工具,之前有篇文章介绍过《中文分词工具-IKAnalyzer下载及使用》,其中自己可以配置扩展词库来使分词结果更合理,我从搜狗、百度输入法下载了细胞词库,将其作为扩展词库。这里不再介绍分词。

中文词语特征值转换(TF-IDF)

分好词后,每一个词都作为一个特征,但需要将中文词语转换成Double型来表示,通常使用该词语的TF-IDF值作为特征值,Spark提供了全面的特征抽取及转换的API,非常方便,详见http://spark.apache.org/docs/latest/ml-features.html,这里介绍下TF-IDF的API:

比如,训练语料/tmp/lxw1234/1.txt:

0,苹果 官网 苹果 宣布
1,苹果 梨 香蕉

逗号分隔的第一列为分类编号,0为科技,1为水果。

case class RawDataRecord(category: String, text: String)

val conf = new SparkConf().setMaster("yarn-client")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

//将原始数据映射到DataFrame中,字段category为分类编号,字段text为分好的词,以空格分隔
var srcDF = sc.textFile("/tmp/lxw1234/1.txt").map { 
      x => 
        var data = x.split(",")
        RawDataRecord(data(0),data(1))
}.toDF()

srcDF.select("category", "text").take(2).foreach(println)
[0,苹果 官网 苹果 宣布]
[1,苹果 梨 香蕉]
//将分好的词转换为数组
var tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
var wordsData = tokenizer.transform(srcDF)

wordsData.select($"category",$"text",$"words").take(2).foreach(println)
[0,苹果 官网 苹果 宣布,WrappedArray(苹果, 官网, 苹果, 宣布)]
[1,苹果 梨 香蕉,WrappedArray(苹果, 梨, 香蕉)]

//将每个词转换成Int型,并计算其在文档中的词频(TF)
var hashingTF = 
new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(100)
var featurizedData = hashingTF.transform(wordsData)

这里将中文词语转换成INT型的Hashing算法,类似于Bloomfilter,上面的setNumFeatures(100)表示将Hash分桶的数量设置为100个,这个值默认为2的20次方,即1048576,可以根据你的词语数量来调整,一般来说,这个值越大,不同的词被计算为一个Hash值的概率就越小,数据也更准确,但需要消耗更大的内存,和Bloomfilter是一个道理。

featurizedData.select($"category", $"words", $"rawFeatures").take(2).foreach(println)
[0,WrappedArray(苹果, 官网, 苹果, 宣布),(100,[23,81,96],[2.0,1.0,1.0])]
[1,WrappedArray(苹果, 梨, 香蕉),(100,[23,72,92],[1.0,1.0,1.0])]

结果中,“苹果”用23来表示,第一个文档中,词频为2,第二个文档中词频为1.

//计算TF-IDF值
var idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
var idfModel = idf.fit(featurizedData)
var rescaledData = idfModel.transform(featurizedData)
rescaledData.select($"category", $"words", $"features").take(2).foreach(println)

[0,WrappedArray(苹果, 官网, 苹果, 宣布),(100,[23,81,96],[0.0,0.4054651081081644,0.4054651081081644])]
[1,WrappedArray(苹果, 梨, 香蕉),(100,[23,72,92],[0.0,0.4054651081081644,0.4054651081081644])]

//因为一共只有两个文档,且都出现了“苹果”,因此该词的TF-IDF值为0.

最后一步,将上面的数据转换成Bayes算法需要的格式,如:

https://github.com/apache/spark/blob/branch-1.5/data/mllib/sample_naive_bayes_data.txt

var trainDataRdd = rescaledData.select($"category",$"features").map {
    case Row(label: String, features: Vector) =>
    LabeledPoint(label.toDouble, Vectors.dense(features.toArray))
}

spark mllib

每一个LabeledPoint中,特征数组的长度为100(setNumFeatures(100)),”官网”和”宣布”对应的特征索引号分别为81和96,因此,在特征数组中,第81位和第96位分别为它们的TF-IDF值。

到此,中文词语特征表示的工作已经完成,trainDataRdd已经可以作为Bayes算法的输入了。

分类模型训练

训练模型,语料非常重要,我这里使用的是搜狗提供的分类语料库,很早之前的了,这里只作为学习测试使用。

下载地址在:http://www.sogou.com/labs/dl/c.html,语料库一共有10个分类:

C000007 汽车
C000008 财经
C000010  IT
C000013 健康
C000014 体育
C000016 旅游
C000020 教育
C000022 招聘
C000023 文化
C000024 军事

每个分类下有几千个文档,这里将这些语料进行分词,然后每一个分类生成一个文件,在该文件中,每一行数据表示一个文档的分词结果,重新用0-9作为这10个分类的编号:
0 汽车
1 财经
2 IT
3 健康
4 体育
5 旅游
6 教育
7 招聘
8 文化
9 军事

比如,汽车分类下的文件内容为:

spark mllib

数据准备好了,接下来进行模型训练及分类预测,代码:

package com.lxw1234.textclassification

import scala.reflect.runtime.universe

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.ml.feature.HashingTF
import org.apache.spark.ml.feature.IDF
import org.apache.spark.ml.feature.Tokenizer
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.Row


object TestNaiveBayes {
  
  case class RawDataRecord(category: String, text: String)
  
  def main(args : Array[String]) {
    
    val conf = new SparkConf().setMaster("yarn-client")
    val sc = new SparkContext(conf)
    
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
    
    var srcRDD = sc.textFile("/tmp/lxw1234/sougou/").map { 
      x => 
        var data = x.split(",")
        RawDataRecord(data(0),data(1))
    }
    
    //70%作为训练数据,30%作为测试数据
    val splits = srcRDD.randomSplit(Array(0.7, 0.3))
    var trainingDF = splits(0).toDF()
    var testDF = splits(1).toDF()
    
    //将词语转换成数组
    var tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
    var wordsData = tokenizer.transform(trainingDF)
    println("output1:")
    wordsData.select($"category",$"text",$"words").take(1)
    
    //计算每个词在文档中的词频
    var hashingTF = new HashingTF().setNumFeatures(500000).setInputCol("words").setOutputCol("rawFeatures")
    var featurizedData = hashingTF.transform(wordsData)
    println("output2:")
    featurizedData.select($"category", $"words", $"rawFeatures").take(1)
    
    
    //计算每个词的TF-IDF
    var idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
    var idfModel = idf.fit(featurizedData)
    var rescaledData = idfModel.transform(featurizedData)
    println("output3:")
    rescaledData.select($"category", $"features").take(1)
    
    //转换成Bayes的输入格式
    var trainDataRdd = rescaledData.select($"category",$"features").map {
      case Row(label: String, features: Vector) =>
        LabeledPoint(label.toDouble, Vectors.dense(features.toArray))
    }
    println("output4:")
    trainDataRdd.take(1)
    
    //训练模型
    val model = NaiveBayes.train(trainDataRdd, lambda = 1.0, modelType = "multinomial")   
    
    //测试数据集,做同样的特征表示及格式转换
    var testwordsData = tokenizer.transform(testDF)
    var testfeaturizedData = hashingTF.transform(testwordsData)
    var testrescaledData = idfModel.transform(testfeaturizedData)
    var testDataRdd = testrescaledData.select($"category",$"features").map {
      case Row(label: String, features: Vector) =>
        LabeledPoint(label.toDouble, Vectors.dense(features.toArray))
    }
    
    //对测试数据集使用训练模型进行分类预测
    val testpredictionAndLabel = testDataRdd.map(p => (model.predict(p.features), p.label))
    
    //统计分类准确率
    var testaccuracy = 1.0 * testpredictionAndLabel.filter(x => x._1 == x._2).count() / testDataRdd.count()
    println("output5:")
    println(testaccuracy)
    
  }
}

执行后,主要输出如下:

output1:(将词语转换成数组)

spark mllib

output2:(计算每个词在文档中的词频)

spark mllib

output3:(计算每个词的TF-IDF)

spark mllib

output4:(Bayes算法的输入数据格式)

spark mllib

output5:(测试数据集分类准确率)

spark mllib

准确率90%,还可以。接下来需要收集分类更细,时间更新的数据来训练和测试了。。

您可以关注 lxw的大数据田地 ,或者 加入邮件列表 ,随时接收博客更新的通知邮件。

 

更新:

程序中使用的/tmp/lxw1234/sougou/目录下的文件提供下载:

链接: http://pan.baidu.com/s/1ntUyI9N 密码: i5nj

 

 

如果觉得本博客对您有帮助,请 赞助作者

转载请注明:lxw的大数据田地 » Spark MLlib实现的中文文本分类–Naive Bayes

喜欢 (30)
分享 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(26)个小伙伴在吐槽
  1. 不错,整套流程都覆盖了
    smartlan2016-01-25 16:33 回复
  2. 你好想问下,这个model 训练好后,比如来一篇文章来 如何调用这个模型,知道属于那个分类
    小二黑2016-03-08 10:12 回复
    • :???: 你这个知道怎么弄了吗
      夕舞雪薇2016-04-27 17:11 回复
  3. /tmp/lxw1234/sougou/文件夹下的文件能否提供下,谢谢,1976716165@qq.com
    developer.chq2016-03-17 11:37 回复
  4. 可以分享下/tmp/lxw1234/sougou/的内容吗?谢谢
    憧憬的sun2016-03-20 17:58 回复
  5. 您好,有个问题想请教您。我如果要判断一条短信信息是否为某一类短信,一开始通过原始数据已经训练好模型并且保存了,如果我每次通过web提交一条短信,是要把Scala代码打包成jar,然后每次在命令行执行一次得到结果么?因为我看每次运行Scala代码时都要几秒启动spark,可以让某个Scala代码像一个服务一样一直开着处理提交的短信信息,而不是每次重新启动,处理一条信息后又关闭。
    running2016-04-05 09:04 回复
    • 参考这篇文章http://lxw1234.com/archives/2016/03/617.htm 可以启动一个长Application作为rpc server,它负责初始化模型,接收待分类数据,然后将分类结果返回给client。
      lxw1234@qq.com2016-04-05 09:21 回复
      • 好的,谢谢您 :mrgreen: 膜拜!
        running2016-04-05 10:25 回复
  6. Error:(59, 42) type Vector takes type parameters case Row(label: String,features: Vector) => ^ Error:(60, 63) value toArray is not a member of Any LabeledPoint(label.toDouble, Vectors.dense(features.toArray[Double])) ^ 大神报上面的错误怎么解决?
    尚少2016-04-11 17:22 回复
    • 找到问题了,包导错了 :arrow:
      尚少2016-04-11 17:53 回复
      • 导错什么包了,我也报这个错 Error:(61, 41) type Vector takes type parameters case Row(label: String, features: Vector) => ^ Error:(62, 61) value toArray is not a member of Any LabeledPoint(label.toDouble, Vectors.dense(features.toArray)) ^
        七匹猫2016-07-28 16:50 回复
      • 。。。您好,我也包这个错,请问导什么包解决?谢谢
        盟主2016-07-30 18:32 回复
      • 什么包?
        哎哟喂2016-10-20 17:02 回复
        • 应该导入import org.apache.spark.ml.linalg.Vector,楼主如果没运行过,千万别误人子弟啊
          pantakill2016-11-22 23:31 回复
          • 哥们,你真牛,你提醒了我,应该统一用ml包的类,现在东西更新太快,ml中的接口在mllib中根本不行了。。。
            Jimmy2017-03-16 22:25 (7天前)
  7. 大神写的真心好,什么时候能出python版本的呀?
    MaskRay2016-04-26 10:43 回复
  8. 您好,我在用您的源码,将贝叶斯分类器改为SVM分类器后,在 /* 转换成SVMWithSGD的输入格式 */时,出现了错误。转换成SVMWithSGD的输入格式代码: val trainDataRdd = trainRescaledData.select("category","features").map { case Row(label: String, features: Vector) => LabeledPoint(label.toDouble, Vectors.dense(features.toArray)) } 在使用您提供的搜狗语料时,运行正常没有问题。当我使用了两千篇语料进行测试时。在这个地方就出错了。出错信息如下: 16/08/19 16:44:37 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 14.0 (TID 535, 10.39.6.142): scala.MatchError: [0.0,(500000,[21060,29243,34393,57586,58517,87713,118386,128402,132755,135990,150213,157424,165320,192464,249655,273645,318360,323440,344739,348903,381632,388508,402939,409168,417990,442219,446177,455321,473958,482097,482477,490471],[2.1039143981985617,5.8971538676367405,3.9802312554546795,3.4992585948383703,3.6147714819602146,5.560681631015528,4.371097564141691,4.51085950651685,11.940693766364038,10.40801337415359,5.02168513028284,5.10869650727247,1.886190914353691,3.9802312554546795,17.484390256566765,5.714832310842786,3.429054336165122,3.41224721784874,0.9204201252161663,1.0931328229034842,5.560681631015528,15.612020061230385,8.421509828133024,4.7985415789686305,4.4620693423474185,2.824460552946621,5.8971538676367405,5.560681631015528,3.817712325956905,5.309367202734622,5.560681631015528,5.714832310842786])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) at com.sina.dw.algo.SVMWithSGDSina$$anonfun$8.apply(SVMWithSGDSina.scala:84) at com.sina.dw.algo.SVMWithSGDSina$$anonfun$8.apply(SVMWithSGDSina.scala:84) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 貌似是scala.MatchError: 部分。到是。不知道scala哪里的模式匹配错误了。 希望您能给我指导下,万分感谢!
    六月麦茬2016-08-19 17:20 回复
    • 兄弟,你搞定了没,我也是这个错误
      Max2016-09-27 12:06 回复
      • 你好,这个问题解决了吗
        kaka2016-10-11 14:38 回复
    • 我也是这个错,后来你解决了吗
      哎哟喂2016-10-20 17:03 回复
      • 请问这个错误你解决了吗
        hanzhuang2016-10-23 22:00 回复
    • 我在用这个源码时也报你这个错误,会是什么原因呢?
      xxx2016-12-29 17:36 回复
    • 首先SVM最好用于二分类的问题吧,这个多分类的貌似不合适?我在spark2.0遇见过这个问题,主要是LabeledPoint的第二个参数需要处理,它是个SpareVector,要转为Vector
      hatter2017-01-04 17:27 回复
      • 哥们,怎么转换成Vector
        Jimmy2017-03-12 20:09 回复
  9. 有点不明白,对于贝叶斯分类来说,每条样本用tf-idf进行向量化,我觉得是不正确的,因为贝叶斯分类统计的是每条样本中的每个词和类别相关的先验概率和条件概率,而用tf-idf向量化之后,就无法准确的统计出那些概率,所以我觉得向量化是不正确的方式。希望进一步探讨
    kaka2016-10-11 10:06 回复
  10. 博主,请教一个问题: 我试着在spark-shell上跑源码是,在最后一步出现问题了: 最后一步,将上面的数据转换成Bayes算法需要的格式,如: https://github.com/apache/spark/blob/branch-1.5/data/mllib/sample_naive_bayes_data.txt var trainDataRdd = rescaledData.select($"category",$"features").map { case Row(label: String, features: Vector) => LabeledPoint(label.toDouble, Vectors.dense(features.toArray)) } 最后trainDataRdd res24: org.apache.spark.sql.Dataset[org.apache.spark.mllib.regression.LabeledPoint] = [label: double, features: vector] 是Dataset,并不是博主图片上的org.apache.spark.rdd.RDD trainDataRdd.take(1)时报错: WARN TaskSetManager: Lost task 0.0 in stage 14.0 (TID 40, 192.168.1.127, executor 2): scala.MatchError: [0,(100,[8,60,85],[0.4054651081081644,0.0,0.4054651081081644])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) 查了很多:应该是Scala.MatchError,但不知道为什么,请博主和小伙伴们指点一二 我才用的是spark2.0
    Jimmy2017-03-12 16:41 回复