NSQ消息队列接受消息并存入mongodb中
首先build.gradle添加依赖如下:
dependencies { //mongodb依赖 compile group: 'org.springframework.data', name: 'spring-data-mongodb', version: '2.0.7.RELEASE' compile group: 'org.mongodb', name: 'mongo-java-driver', version: '3.8.0-beta3' //Gson依赖(相当与java中用的Json依赖) compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5' //NSQ依赖 compile group: 'com.github.brainlag', name: 'nsq-client', version: '1.0.0.RC4' compile group: 'com.github.mitallast', name: 'scala-nsq_2.12', version: '1.12' //vertx依赖 compile group: 'io.vertx', name: 'vertx-core', version: '3.5.2' compile group: 'io.vertx', name: 'vertx-web', version: '3.5.2' //kotlin依赖 compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version" testCompile group: 'junit', name: 'junit', version: '4.12' }
package com.xctl.read import com.github.brainlag.nsq.NSQConsumer import com.github.brainlag.nsq.NSQMessage import com.github.brainlag.nsq.callbacks.NSQMessageCallback import com.github.brainlag.nsq.lookup.DefaultNSQLookup import com.google.gson.Gson import com.mongodb.MongoClient import com.mongodb.MongoCredential import com.mongodb.ServerAddress import com.xctl.xmodel.HotelRoomInformation import io.vertx.core.AbstractVerticle import io.vertx.core.Vertx import org.bson.Document import java.util.* fun main(args: Array<String>) { ReadAndSaveDataMongodb().start() } class ReadAndSaveDataMongodb: AbstractVerticle(){ override fun start() { super.start() val vertx= Vertx.vertx() vertx.deployVerticle("1"){ ReadAndSaveDataMongodb().nsqConsumer() } } //NSQ消费者 fun nsqConsumer() { val lookup = DefaultNSQLookup() lookup.addLookupAddress("39.106.196.5", 3161) val consumer = NSQConsumer(lookup, "app.online.dcupload.roomEnergy", "nsq_to_file", object : NSQMessageCallback { override fun message(message: NSQMessage) { //接受bytearray数据 val b = message.getMessage() //bytearray转String(json) val c= String(b) //json转model(pojo对象) val HotelRoomInformation=Gson().fromJson<HotelRoomInformation>(c,HotelRoomInformation::class.java) //model(pojo对象)转json val json=Gson().toJson(HotelRoomInformation) //连接mongodb数据库 try { //连接到MongoDB服务 如果是远程连接可以替换“localhost”为服务器所在IP地址 //ServerAddress()两个参数分别为 服务器地址 和 端口 val serverAddress = ServerAddress("39.106.196.5", 27017) val addrs = ArrayList<ServerAddress>() addrs.add(serverAddress) //MongoCredential.createScramSha1Credential()三个参数分别为 用户名 数据库名称 密码 val credential = MongoCredential.createScramSha1Credential("house_keeper", "house_keeper", "house_keeper".toCharArray()) val credentials = ArrayList<MongoCredential>() credentials.add(credential) //通过连接认证获取MongoDB连接 val mongoClient = MongoClient(addrs, credentials) //连接到数据库 val mongoDatabase = mongoClient.getDatabase("house_keeper") //连接collection val collection=mongoDatabase.getCollection("HotelRoomInformation") //插入数据 val document=Document.parse(json) collection.insertOne(document) } catch (e: Exception) { System.err.println(e.javaClass.name + ": " + e.message) } message.finished() } }) consumer.start() } } 先运行如果有发送的消息,将会接受消息并存入mongodb
上面已经运行,现在我自己用可视化工具发送一条信息,如下:
此时我的mongodb新增加了一条信息,如图: