从kafka中读取数写入msql中,遇到的异常和操作展示

 

object KafkaWriteMsql {
//连接msql数据库将数据写入msql
  def Mysql(rel:String)={
    println(rel)
    val data = rel.split("\t")
    val connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/rng_comment?characterEncoding=UTF-8","root","root")
//    val sql = "update  datas (username) values (?)"
    val sql = "REPLACE into t_user_counts (id,username,count) values  (?,?,?)"
    val ps: PreparedStatement = connection.prepareStatement(sql)
    ps.setInt(1,data(0).toInt)
    ps.setString(2,data(8))
    ps.setString(3,"1")
    ps.executeUpdate()
    connection.close()

  }
//读取msql数据库,进行时时操作在讲数据写入msql
  def Mysql02()={
    val connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/rng_comment?characterEncoding=UTF-8","root","root")
    //获取表中的数据
    val statement: PreparedStatement = connection.prepareStatement("SELECT username,COUNT(*) count  FROM `t_user_counts` GROUP BY username")
    //获取表中结果
    val rel = statement.executeQuery()
    while (rel.next()){
      val sql = "REPLACE into t_word_counts2 (id,username,count) values  (null,?,?)"
      val ps: PreparedStatement = connection.prepareStatement(sql)
      //获取每个表中的字段
      val str01: String = rel.getString("username")
      val str02: String = rel.getString("count")
        ps.setString(1,str01)
        ps.setInt(2,str02.toString.toInt)
        ps.executeUpdate()
    }
    connection.close()
  }
//读取kafka的数据进行操作   然后调用msql方法
  def main(args: Array[String]): Unit = {
    //1.实例一个SparkConf
    val conf = new SparkConf().setAppName("KafkaWriteMsql").setMaster("local[*]").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //2.实例一个SparkContext
    val sc = new SparkContext(conf)

    sc.setLogLevel("WARN")
    //3.实例一个StreamingContext()
    val ssc = new StreamingContext(sc,Seconds(5))
    //设置缓存数据的位置
    ssc.checkpoint("./cache")


    var  kafkaParams= Map[String, Object](
      "bootstrap.servers" -> "node01:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "KafkaWriteMsql",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val kafkaDatas: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array("RNGComment"), kafkaParams))



//    第一题  过滤出数据含义姓名的
//    val name = kafkaDatas.filter(_.value().split("\t").length==11).filter(_.value().split("\t")(8)=="刘恒")
//    name.foreachRDD(x=>{
//     x.foreach(x=>Mysql(x.value()))
//    })
//    //第二题  实时统计“自己姓名”数据的总条数,更新到t_user_counts2
    Mysql02()
//val name = kafkaDatas.filter(_.value().split("\t").length==11).filter(_.value().split("\t")(8)=="刘恒").count()
//
//name.foreachRDD(x=>{
//  x.foreach(x=>Mysql02(x.toString))
//})


    kafkaDatas.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

链表

resultSet是你查询的结果集合,rs.next相当于一个指针返回true或false,起始位置为0,每调用一次向下移动一下,如果返回true说明还有记录

 

 

had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord

是否有一个不可序列化的结果:org.apache.kafka.clients.consumer.ConsumerRecor

解决方法

从kafka中读取数写入msql中,遇到的异常和操作展示

 

 

 

java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing

java. lang。需求失败:没有注册输出操作,所以什么也没有

从kafka中读取数写入msql中,遇到的异常和操作展示

 

 

解决java.sql.SQLException: After end of result set

从kafka中读取数写入msql中,遇到的异常和操作展示