从kafka中读取数写入msql中,遇到的异常和操作展示
object KafkaWriteMsql { //连接msql数据库将数据写入msql def Mysql(rel:String)={ println(rel) val data = rel.split("\t") val connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/rng_comment?characterEncoding=UTF-8","root","root") // val sql = "update datas (username) values (?)" val sql = "REPLACE into t_user_counts (id,username,count) values (?,?,?)" val ps: PreparedStatement = connection.prepareStatement(sql) ps.setInt(1,data(0).toInt) ps.setString(2,data(8)) ps.setString(3,"1") ps.executeUpdate() connection.close() } //读取msql数据库,进行时时操作在讲数据写入msql def Mysql02()={ val connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/rng_comment?characterEncoding=UTF-8","root","root") //获取表中的数据 val statement: PreparedStatement = connection.prepareStatement("SELECT username,COUNT(*) count FROM `t_user_counts` GROUP BY username") //获取表中结果 val rel = statement.executeQuery() while (rel.next()){ val sql = "REPLACE into t_word_counts2 (id,username,count) values (null,?,?)" val ps: PreparedStatement = connection.prepareStatement(sql) //获取每个表中的字段 val str01: String = rel.getString("username") val str02: String = rel.getString("count") ps.setString(1,str01) ps.setInt(2,str02.toString.toInt) ps.executeUpdate() } connection.close() } //读取kafka的数据进行操作 然后调用msql方法 def main(args: Array[String]): Unit = { //1.实例一个SparkConf val conf = new SparkConf().setAppName("KafkaWriteMsql").setMaster("local[*]").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //2.实例一个SparkContext val sc = new SparkContext(conf) sc.setLogLevel("WARN") //3.实例一个StreamingContext() val ssc = new StreamingContext(sc,Seconds(5)) //设置缓存数据的位置 ssc.checkpoint("./cache") var kafkaParams= Map[String, Object]( "bootstrap.servers" -> "node01:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "KafkaWriteMsql", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val kafkaDatas: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array("RNGComment"), kafkaParams)) // 第一题 过滤出数据含义姓名的 // val name = kafkaDatas.filter(_.value().split("\t").length==11).filter(_.value().split("\t")(8)=="刘恒") // name.foreachRDD(x=>{ // x.foreach(x=>Mysql(x.value())) // }) // //第二题 实时统计“自己姓名”数据的总条数,更新到t_user_counts2 Mysql02() //val name = kafkaDatas.filter(_.value().split("\t").length==11).filter(_.value().split("\t")(8)=="刘恒").count() // //name.foreachRDD(x=>{ // x.foreach(x=>Mysql02(x.toString)) //}) kafkaDatas.print() ssc.start() ssc.awaitTermination() } } |
链表
resultSet是你查询的结果集合,rs.next相当于一个指针返回true或false,起始位置为0,每调用一次向下移动一下,如果返回true说明还有记录
had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
是否有一个不可序列化的结果:org.apache.kafka.clients.consumer.ConsumerRecor
解决方法
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing
java. lang。需求失败:没有注册输出操作,所以什么也没有
解决java.sql.SQLException: After end of result set