
本来是打算把源码打成压缩包上传的,无奈公司在安全方面做的很严格,将文件上传到****, github等网站会被公司电脑检测到并被禁止,无奈以只能粘贴到博文中(目前已经去除与公司业务相关的任何代码,只剩一个通用的外壳,并能保证可运行),虽然黏贴到博文中的代码比较多,但本人保证黏贴的比较全,是可以根据博文完全还原这个项目的。
虽然本人会张贴出所有代码,但文件结构已经打乱,所以截图一张原始项目的文件结构图,整个项目19个类,所以也不是很多,还原项目还是很容易的, 完整项目的截图如下所示:


本程序是一个完整的java程序, 采用了生产者-消费者设计模式, 生产者不停的生成消息,并存入一个线程安全的队列,消费者不断的从队列中取出消息,并发送到kafka集群。
1. 需要发送的消息总数可通过配置文件参数totalCount指定
2. 可以通过配置文件修改参数 initialCount,只有生产的消息数量达到initialCount指定的值,程序才开始启动消费者,如果指定的数字是0,则消息生成线程,和消息发送线程同时启动运行
3. bootstrap.servers 用于配置kafka集群机器的地址,地址格式是:ip + 端口号, 不能使用主机名 + 端口号, 如果有多个节点,则 使用英文逗号分隔
4. zookeeper.connector 用于配置kafka集群依赖的zookeeper集群地址,格式同样是ip + 端口号, 不能使用主机名 + 端口号, 如果有多个节点,则 使用英文逗号分隔
5. request.required.acks=1, 该参数配置为1, 表示kafka客户端发出消息且kafka集群反馈成功才开始发送下一条消息, 配置为0表示发送后不等待反馈结果直接发送下一条消息,配置为0并发速度更高,但是不能保证消息一定发送成功
6. produce_thread_count 表示消息生成线程的数量
7. send_thread_count 表示消息发送线程的数量
8. topic 表示消息发往的目标topic
9. print_progress 设置为true表示是否打印发送进度
10. print_progress_strategy, 表示打印消息进度的策略, 生成一条数据或消费一条数据就汇报一次, ONE_BY_ONE, 每分n条数据才报告一次:INTERVAL, 报告的进度是百分比:PERCENTAGE
11. print_progress_interval 参数表示进度报告间隔, 配置为50, 表示每发送50条数据,打印一次进度,配置为0,则没法送一条消息,进报告一次进度
12. msg_generate_class 可以指定生成消息的实现类, 本人公开的代码只有一种实现类,该类与公司业务无关,也即com.core.service.SimpleMsgGenerator, 该消息生成类生成的消息是一句简单的话,“hello kafka”, 有需要的朋友可以根据自身业务自定义实现类

配置文件的名称如上图,叫做send-cfg.properties, 完整的配置项罗列如下:



# ip + 端口号, 不能使用主机名 + 端口号

# ip + 端口号, 不能使用主机名 + 端口号

# 1:发送后报告成功才发送下一条,  0:发送成功后不报告下一条,直接发送,配置为0并发速度更高,但是不能保证发送成功





#生成一条数据或消费一条数据就汇报一次:ONE_BY_ONE,   每分n条数据才报告一次:INTERVAL, 报告的进度是百分比:PERCENTAGE



run.bat是程序打成jar包后, 在windows上的的运行脚本,内容如下:

echo hello kfaka

java -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=8999,suspend=n  -Xms1024M -Xmx1024M -XX:PermSize=512M -XX:MaxNewSize=256M -XX:MaxPermSize=512M  -jar bodySender-1.0.0-SNAPSHOT-jar-with-dependencies.jar send-cfg.properties


run.sh是程序打成jar包后, 在linux上的运行脚本,内容如下:

#! /bin/bash
runPath=$(cd `dirname $0`;pwd)
cd runPath

echo "the script is located in directory :" $runPath
echo "config file is send-cfg.properties" 

if  [ $#>=0 ]; then

    echo "java路径是:"; $1
        $1/bin/java -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=8999,suspend=n  -Xms1024M -Xmx1024M -XX:PermSize=512M -XX:MaxNewSize=256M -XX:MaxPermSize=512M  -jar  bodySender-1.0.0-SNAPSHOT-jar-with-dependencies.jar send-cfg.properties


    echo "没有指定java路径,默认java环境变量已经配置";
        java -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=8999,suspend=n  -Xms1024M -Xmx1024M -XX:PermSize=512M -XX:MaxNewSize=256M -XX:MaxPermSize=512M  -jar bodySender-1.0.0-SNAPSHOT-jar-with-dependencies.jar send-cfg.properties


readme.txt 和 打包方式.md两个文件分别提供了打包方式和运行说明


  1. maven打包,得到 bodySender-1.0.0-SNAPSHOT-jar-with-dependencies.jar
  2. 新建文件夹sendTool
  3. 将:bodySender-1.0.0-SNAPSHOT-jar-with-dependencies.jar, run.bat, run.sh, readme.txt, send-cfg.properties拷贝到该文件夹下
  4. 将sendTool发送给测试人员,测试参照readme.txt中运行方式部分的说明运行程序




  1. 将sendTool文件夹拷贝到linux,如果要修改配置,则修改sendTool目录下的send-cfg.properties文件

  2. 在命令行中输入vim run.sh 编辑run.sh

  3. 在vim编辑器的命令模式下输入 set ff=unix 并回车,修改改文件的格式

  4. 输入:wq退出vim

  5. 执行命令 chmod 777 run.sh赋予文件可执行权限

  6. 假如linux机器已经配置了java环境变量,则运行./run.sh命令即可启动程序,如果没有配置java环境变量,则运行 ./run.sh /usr/lib/cluster001/jdk1.8.0_162

  7. 注意:/usr/lib/cluster001/jdk1.8.0_162 是java的安装路径,需要根据具体情况修改,该路径参数最后不能添加"/", 比如 ./run.sh /usr/lib/cluster001/jdk1.8.0_162/ 这样运行程序会报错 注意:/usr/lib/cluster001/jdk1.8.0_162 是java的安装路径,需要根据具体情况修改,该路径参数最后不能添加"/", 比如 ./run.sh /usr/lib/cluster001/jdk1.8.0_162/ 这样运行程序会报错

根据上述解释,程序功能还是比较清晰了, 想要运行程序,如果项目已经还原到IDE中,则直接运行APP类的main函数即可,如果已经打成jar包,可以参照readme.txt描述的方式运行。

完整程序的源码如下(19个类, 每个类所在的包可参加上图):

package com.core;

import com.core.constant.Config;
import com.core.service.Container;
import com.core.service.ProduceThread;
import com.core.service.SendThread;
import com.core.util.JsonUtils;
import com.core.util.PropertiesFile;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;

public class APP {

    private static final String CONFIG_FILE = "send-cfg.properties";

    public static void main(String... args) {

        String line = null;
        do {
            Scanner scan = new Scanner(System.in);
            line = scan.nextLine();
        } while (StringUtils.isBlank(line));


    public static void run(String... args) {

        long start = System.currentTimeMillis();

        PropertiesFile prop = null;
        if (args == null || args.length == 0) {
            prop = new PropertiesFile(CONFIG_FILE, false);
        } else {
            prop = new PropertiesFile(args[0], true);

        Config config = new Config(prop);

        System.out.println("用户配置的参数是:\r\n" + JsonUtils.object2Json(config, true));


        Container<ProducerRecord<byte[], byte[]>> container = new Container<ProducerRecord<byte[], byte[]>>(config.getiInitialCount() + 10000);

        Integer produceThreadNum = config.getProduceThreadNum();
        List<Thread> producerList = new ArrayList<Thread>();

        CountDownLatch produceSwitch = new CountDownLatch(config.getProduceThreadNum());
        for(int i = 0; i < produceThreadNum; i++) {
            Thread t = new Thread(new ProduceThread(config, container, produceSwitch));
            t.setName("send-thred-" + i);

        try {
            System.out.println("消息达到初始量" + config.getiInitialCount() + ", 开始发送消息");
        } catch (InterruptedException e) {


        Integer sendThreadNum = config.getSendThreadNum();
        CountDownLatch consumeStartSwitch = new CountDownLatch(1);
        CountDownLatch consumeEndSwitch = new CountDownLatch(sendThreadNum);
        List<Thread> senderList = new ArrayList<Thread>();
        for(int i = 0; i < sendThreadNum; i++) {
            Thread t = new Thread(new SendThread(config, container, consumeStartSwitch, consumeEndSwitch));
            t.setName("send-thread-" + i);


        try {
        } catch (InterruptedException e) {

        System.out.println("一共发送消息" + config.getTotalCount() + "条");

        int error = (container.getConsumeCount() - config.getTotalCount());
        System.out.println("发送的消息总量误差是[" + (error >= 0 ? "+" : "-") + error + "]条");

        System.out.println("耗时" + (System.currentTimeMillis() - start) + "毫秒");

    public static void check(Config config) {
        if (config == null) {

        if (config.getTotalCount() <= 0 || config.getiInitialCount() < 0) {
            throw new IllegalArgumentException("require: totalCount > 0 && initialCount >= 0");

        if (config.getiInitialCount() > config.getTotalCount()) {
            throw new IllegalArgumentException("require: initialCount <= totalCount");

        if (config.getProduceThreadNum() <= 0 || config.getSendThreadNum() <= 0) {
            throw new IllegalArgumentException("require: produce_thread_count > 0 && send_thread_count > 0");


package com.core.constant;

import com.core.util.PropertiesFile;

public class Config {

    private PropertiesFile props;
    private int totalCount;
    private String bootstrap_servers;
    private String zookeeper_connector;
    private String request_required_acks;
    private int produce_thread_count;
    private int send_thread_count;
    private int initialCount;
    private String topic;
    private boolean print_progress;
    private String print_progress_strategy;
    private int print_progress_interval;
    private String msg_generate_class;

    public Config(PropertiesFile props) {
        this.props = props;
        this.totalCount = props.getIntProperty("totalCount", 0);
        this.bootstrap_servers = props.getStringProperty("bootstrap.servers");
        this.zookeeper_connector = props.getStringProperty("zookeeper.connector");
        this.request_required_acks = props.getStringProperty("request.required.acks", "1");
        this.produce_thread_count = props.getIntProperty("produce_thread_count", 0);
        this.send_thread_count = props.getIntProperty("send_thread_count", 0);
        this.initialCount = props.getIntProperty("initialCount", 0);
        this.topic = props.getStringProperty("topic");
        this.print_progress = props.getBooleanProperty("print_progress", true);
        this.print_progress_strategy = props.getStringProperty("print_progress_strategy");
        this.print_progress_interval = props.getIntProperty("print_progress_interval", 1000);
        this.msg_generate_class = props.getStringProperty("msg_generate_class");


    public Integer getTotalCount() {
        return totalCount;

    public String getBootstrapServers() {
        return bootstrap_servers;

    public String getZookeeperConnector() {
        return zookeeper_connector;

    public String getRequestRequiredAcks() {
        return request_required_acks;

    public Integer getProduceThreadNum() {
        return produce_thread_count;

    public Integer getSendThreadNum() {
        return send_thread_count;

    public Integer getiInitialCount() {
        return initialCount;

    public String getTopic() {
        return topic;

    public String getMsgGenerateClass() {
        return msg_generate_class;

    public boolean printProgress() {
        return print_progress;

    public String getProgressStrategy() {
        return print_progress_strategy;

    public int getProgressInterval() {
        return print_progress_interval;

package com.core.constant;

public class Constant {

    public final  static String PRODUCER_SIGN = ">";

    public final  static String CONSUMER_SIGN = "<";

package com.core.exception;

 * @description 所有运行时异常的基类
 * @author lihong10 2018/4/16 13:44:00
public class BaseRuntimeException extends RuntimeException {

     * 序列化ID
    private static final long serialVersionUID = 7830353921973771800L;

     * 错误码
    protected Integer errorCode;

     * 创建一个新的实例CommonException
     * @param errorCode
     * @param msg
    public BaseRuntimeException(int errorCode, String msg) {
        this.errorCode = errorCode;

     * 创建一个新的实例CommonException
     * @param errorCode
     * @param msg
     * @param cause
    public BaseRuntimeException(int errorCode, String msg, Throwable cause) {
        super(msg, cause);
        this.errorCode = errorCode;

    public BaseRuntimeException(String msg, Throwable cause) {
        super(msg, cause);

    public BaseRuntimeException(String msg) {

    public Integer getErrorCode() {
        return errorCode;

    public void setErrorCode(Integer errorCode) {
        this.errorCode = errorCode;
package com.core.exception;

 * @description 不合法的参数异常,比如参数为空,参数格式不对均可以抛出该异常
 * @author lihong10 2018/4/16 11:34:00
public class IllegalParameterException extends BaseRuntimeException {

    public IllegalParameterException(int errorCode, String msg) {
        super(errorCode, msg);

    public IllegalParameterException(int errorCode, String msg, Throwable cause) {
        super(errorCode, msg, cause);

    public IllegalParameterException(String msg, Throwable cause) {
        super(msg, cause);

    public IllegalParameterException(String msg) {

package com.core.progress;

import com.core.constant.Config;

public class IntervalReporter extends Reporter {

    public void report(int progress, Config config, boolean isProducer) {

        if (config.getProgressInterval() > 0) {
            if (progress % config.getProgressInterval() == 0) {
                print(getSign(isProducer), false);


        //间隔 <= 0,则没有间隔,退化为 ONE_BY_ONE 策略
        print(getSign(isProducer), false);

package com.core.progress;

import com.core.constant.Config;

public class OneByOneReporter extends Reporter {

    public void report(int progress, Config config, boolean isProducer) {

        print(getSign(isProducer), false);

package com.core.progress;

import com.core.constant.Config;

public class PercentageReporter extends Reporter {

    public void report(int progress, Config config, boolean isProducer) {

        if (config.getProgressInterval() > 0) {
            if (progress % config.getProgressInterval() == 0) {

                print(getSign(isProducer, progress, config.getTotalCount()), false);


        //间隔 <= 0,则没有间隔,退化为 ONE_BY_ONE 策略
        print(getSign(isProducer, progress, config.getTotalCount()) , false);


    private String getSign(boolean isProducer, int progress, int total) {
        double percent = ((progress * 1.0) / total) * 100;
        if (isProducer) {
            return  "(" + getSign(isProducer) + " %" + percent + ")  ";

        return "(%" + percent + " " + getSign(isProducer) + ")  ";


package com.core.progress;

import com.core.constant.Config;
import org.apache.commons.lang3.StringUtils;

public class ProgressReportClient {

    private final static OneByOneReporter oneByeOne = new OneByOneReporter();
    private final static IntervalReporter interval = new IntervalReporter();
    private final static PercentageReporter percentage = new PercentageReporter();

    public static void report(int progress, Config config, boolean isProducer) {
        if (!config.printProgress()) {

        ProgressReportType type = null;
        String strategy = config.getProgressStrategy();
        if (StringUtils.isBlank(strategy)) {
            type = ProgressReportType.ONE_BY_ONE;
        } else {
            type = ProgressReportType.valueOf(strategy);

        switch (type) {
            case INTERVAL:
                interval.report(progress, config, isProducer);
            case ONE_BY_ONE:
                 oneByeOne.report(progress, config, isProducer);
            case PERCENTAGE:
                percentage.report(progress, config, isProducer);
                    oneByeOne.report(progress, config, isProducer);

package com.core.progress;

public enum ProgressReportType {




package com.core.progress;

import com.core.constant.Config;
import com.core.constant.Constant;

public abstract class Reporter {

    public static void print(Object obj, boolean newLine) {
        if (newLine) {
        }else {

    public static String getSign(boolean isProducer) {
        if (isProducer) {
            return Constant.PRODUCER_SIGN;
        } else {
            return Constant.CONSUMER_SIGN;

    public abstract void report(int progress, Config config, boolean isProducer);

package com.core.service;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class Container<E> {

    private BlockingQueue<E> pool;
    private  volatile AtomicInteger produceCount = new AtomicInteger(0);
    private volatile AtomicInteger consumeCount = new AtomicInteger(0);
    private int num;

    public Container(int num) {
        this.num = num;
        this.pool = new ArrayBlockingQueue<E>(num);

    public int addConsumeCount(int num) {
        return  consumeCount.addAndGet(num);

    public E get() {
        try {
            E msg = pool.take();
//            consumeCount.addAndGet(1); //发送成功回调才累加,所以此处注释
            return  msg;
        } catch (InterruptedException e) {

        return null;

    public void offer(E msg) {
        try {
        } catch (InterruptedException e) {

    public int getProduceCount() {
        return produceCount.get();

    public int getConsumeCount() {
        return consumeCount.get();
package com.core.service;

import com.core.constant.Config;

public interface MsgGenerator {

    public String getMsg(Config config);

package com.core.service;

import com.core.constant.Config;
import com.core.exception.IllegalParameterException;
import com.core.progress.ProgressReportClient;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.CountDownLatch;

public class ProduceThread implements Runnable {

    private Config config;
    private Container<ProducerRecord<byte[], byte[]>> container;
    private volatile CountDownLatch produceSwitch;
    private MsgGenerator msgGenerator;

    public ProduceThread(Config config, Container<ProducerRecord<byte[], byte[]>> container, CountDownLatch produceSwitch) {
        this.config = config;
        this.container = container;
        this.produceSwitch = produceSwitch;

        try {
            msgGenerator = (MsgGenerator) Class.forName(config.getMsgGenerateClass()).newInstance();
        } catch (Exception e) {
            throw new IllegalParameterException("无法加载消息生成的实现类");

    public void run() {
        for (; ; ) {

            if (container.getProduceCount() >= config.getiInitialCount()) {
                if (produceSwitch != null) {
                    //防止在区间[initialCount, totalCount]内出现多次countDown()调用,导致发送线程被过早唤醒
                    produceSwitch = null;

            if (container.getProduceCount() > config.getTotalCount()) {

            try {
                // new SimpleMsgGenerator() , 这里每次都创建一个实例,可以优化
                String  msg = msgGenerator.getMsg(config);
                ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(config.getTopic(), msg.getBytes("UTF-8"));
                ProgressReportClient.report(container.getProduceCount(), config, true);
            } catch (UnsupportedEncodingException e) {
            } catch (Exception e) {

package com.core.service;

import com.core.constant.Config;
import com.core.progress.ProgressReportClient;
import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class SendThread implements Runnable {

    private Config config;
    private Container<ProducerRecord<byte[], byte[]>> container;
    private static Properties kafkaProperties = new Properties();
    private static KafkaProducer<byte[], byte[]> kafkaProducer = null;
    private  CountDownLatch consumeStartSwitch;
    private CountDownLatch consumeEndSwitch;

    public SendThread(Config config) {
        this.config = config;

    public SendThread(Config config, Container container, CountDownLatch consumeStartSwitch, CountDownLatch consumeEndSwitch ) {
        this.container = container;
        this.consumeStartSwitch = consumeStartSwitch;
        this. consumeEndSwitch = consumeEndSwitch;

        kafkaProperties.put("bootstrap.servers", config.getBootstrapServers());
        kafkaProperties.put("zookeeper.connector", config.getZookeeperConnector());
        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        kafkaProperties.put("serializer.class", "kafka.serializer.StringEncoder");
        kafkaProperties.put("request.required.acks", config.getRequestRequiredAcks());
        this.kafkaProducer = new KafkaProducer<>(kafkaProperties);


    public void run() {

        try {
        } catch (InterruptedException e) {

        for (;;) {

            if (container.getConsumeCount() >= config.getTotalCount()) {

            ProducerRecord<byte[], byte[]> record = container.get();

            if (record != null) {
                kafkaProducer.send(record, new Callback() {
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (e != null) {

                        ProgressReportClient.report(container.getProduceCount(), config, false);



package com.core.service;

import com.core.constant.Config;

public class SimpleMsgGenerator implements MsgGenerator {

    public String getMsg(Config config) {
        return "hello kafka";
package com.core.service;

import org.apache.kafka.clients.producer.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.UnsupportedEncodingException;
import java.util.Properties;

public class ToKafkaSender {

    private static Properties kafkaProperties = new Properties();
    private static KafkaProducer<byte[], byte[]> kafkaProducer = null;

    public void init() {

        String ip = "";
         * 用于自举(bootstrapping ),producer只是用它来获得元数据(topic, partition, replicas)
         * 实际用户发送消息的socket会根据返回的元数据来确定
        // kafkaProperties.put("metadata.broker.list", "vsp13:9092");
        kafkaProperties.put("bootstrap.servers", ip.trim() + ":9092");
        kafkaProperties.put("zookeeper.connector", ip.trim() + ":2181");
        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");

        kafkaProperties.put("serializer.class", "kafka.serializer.StringEncoder");
         * producer发送消息后是否等待broker的ACK,默认是0
         * 1 表示等待ACK,保证消息的可靠性
        kafkaProperties.put("request.required.acks", "1");
        this.kafkaProducer = new KafkaProducer<>(kafkaProperties);

    public void testSend() {
        long now = System.currentTimeMillis();
        for(int i = 0;  i < 20; i++) {
        System.out.println(System.currentTimeMillis() - now);

    private void send() {
        String msg = getMsg();
        ProducerRecord<byte[], byte[]> record = null;

        try {
            //MY_TOPIC 是kafka集群中的某个topic
            record = getProducerRecord("MY_TOPIC",  msg.getBytes("UTF-8"));
        } catch (UnsupportedEncodingException e) {

        kafkaProducer.send(record, new Callback() {
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e != null) {


    public void clean() {
        if (kafkaProducer != null) {
            try {
            } catch (Exception e) {

    private String getMsg() {
        return "hello kafka !";

    private ProducerRecord<byte[], byte[]> getProducerRecord(String topic, byte[] msg) {
        ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, msg);
        return record;

package com.core.util;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;

public class JsonUtils {
    private static final Logger LOG = LoggerFactory.getLogger(JsonUtils.class);
    private static ObjectMapper objectMapper = new ObjectMapper();

    public static String toJsonWithFormat(Object obj, String dateFormat) {
        if (obj == null) {
            return null;
        String result = null;
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            //DateFormat oldFormat = objectMapper.getDateFormat();
            if (StringUtils.isNotEmpty(dateFormat)) {
                objectMapper.setDateFormat(new SimpleDateFormat(dateFormat));
                TimeZone timeZone = TimeZone.getTimeZone("GMT+8");
            result = objectMapper.writeValueAsString(obj);
        } catch (IOException e) {
        return result;

    public static String object2Json(Object obj) {
        if (obj == null) {
            return null;
        String result = null;
        try {
            result = objectMapper.writeValueAsString(obj);
        } catch (IOException e) {
            LOG.error("对象转JSON字符串异常", e);
        return result;

    public static String object2Json(Object obj, boolean indented) {

        if(obj == null) {
            return null;
        String result = null;
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            if(indented) {
                result = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(obj);
            } else {
                result = objectMapper.writeValueAsString(obj);
        } catch (IOException e) {
            LOG.error("error when object to json", e);
        return result;

    public static Map<?, ?> jsonToMap(String json) {
        return json2Object(json, Map.class);

    public static <T> T json2Object(String json, Class<T> cls) {
        T result = null;
        try {
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
            result = objectMapper.readValue(json, cls);
        } catch (IOException e) {
            LOG.error("JSON字符串转对象异常", e);

        return result;

    public static <T> T conveterObject(Object srcObject, Class<T> destObjectType) {
        String jsonContent = object2Json(srcObject);
        return json2Object(jsonContent, destObjectType);

    public static <T> List<T> fromJsonList(String json, Class<T> clazz) throws IOException {
        return objectMapper.readValue(json, objectMapper.getTypeFactory().constructCollectionType(List.class, clazz));

package com.core.util;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.Properties;

public class PropertiesFile {

    private final static Logger LOG = LoggerFactory.getLogger(PropertiesFile.class);
    private Properties p;
    private String fileName;

     * @param fileName 要加载的properties文件名, 必要的话可加上路径
     * @author lihong10 2015-4-14 上午11:19:41
     * @since v1.0
    public PropertiesFile(String fileName, boolean outside) {
        this.p = new Properties();
        this.fileName = fileName;

        InputStream inputStream = null;
        try {
            if (outside) {
                inputStream = getInputStreamByFile(fileName);
            } else {
                inputStream = getInputStream(Thread.currentThread().getContextClassLoader(), fileName);
                if (inputStream == null) {
                    inputStream = getInputStream(PropertiesFile.class.getClassLoader(), fileName);
        } catch (Exception ex) {
//            LOG.error("找不到配置文件: " + fileName, ex);
            throw new RuntimeException("找不到配置文件: " + fileName, ex);
        } finally {
            if (inputStream != null) {
                try {
                } catch (IOException e) {
                    LOG.error("关闭文件流失败", e);


    public static InputStream getInputStreamByFile(String path) {
        File file = new File(path);
        if (!file.isFile() || !file.exists()) {
            throw new IllegalArgumentException("文件" + path + "不存在");

        InputStream in = null;
        try {
            in = new FileInputStream(file);
        } catch (FileNotFoundException e) {

        return in;

    public static InputStream getInputStream(ClassLoader classLoader, String fileName) {
        if (classLoader == null || StringUtils.isBlank(fileName)) {
            LOG.info("classLoader is null or fileName is null");
            return null;

        fileName = fileName.trim();

        InputStream stream = null;
        try {
            stream = classLoader.getResourceAsStream(fileName);
        } catch (Exception e) {
            LOG.error("read " + fileName + " error", e);

        if (stream == null && !fileName.startsWith("/")) {
            try {
                stream = classLoader.getResourceAsStream("/" + fileName);
            } catch (Exception e) {
                LOG.error("read /" + fileName + " error", e);
        return stream;

     * @param propertyName
     * @return property value
     * @author lihong10 2015-4-14 上午11:22:23
     * @since v1.0
    public String getStringProperty(String propertyName) {
        return p.getProperty(propertyName);

    public String getStringProperty(String propertyName, String dft) {
        String value = p.getProperty(propertyName);
        if (StringUtils.isBlank(value)) {
            return dft;
        return value;

    public Integer getIntProperty(String propertyName, Integer dft) {
        String raw = p.getProperty(propertyName);
        return getInt(raw, dft);

    public Long getLongProperty(String propertyName, Long dft) {
        String raw = p.getProperty(propertyName);
        return getLong(raw, dft);

    public Boolean getBooleanProperty(String propertyName, Boolean dft) {
        String raw = p.getProperty(propertyName);
        return getBoolean(raw, dft);

     * @param propertyName
     * @param propertyValue
     * @author lihong10 2015-6-15 下午4:16:54
     * @since v1.0
    public void setProperty(String propertyName, String propertyValue) {
        p.setProperty(propertyName, propertyValue);

     * @return the Properties
    public Properties getProps() {
        return p;

     * @return the fileName
    public String getFileName() {
        return fileName;

    private Integer getInt(String str, Integer dft) {
        try {
            return Integer.parseInt(str.trim());
        } catch (Exception e) {
            LOG.error("error when parsing " + str + " to int, use default value: " + dft);
            return dft;

    private Long getLong(String str, Long dft) {
        Long value = null;
        try {
            value =  Long.parseLong(str.trim());
        } catch (Exception e) {
            LOG.error("error when parsing " + str + " to long, use default value: " + dft);
            return dft;

        return (value == null) ? dft : value;

    private Boolean getBoolean(String str, Boolean dft) {
        try {
            return Boolean.parseBoolean(str.trim());
        } catch (Exception e) {
            LOG.error("error when parsing " + str + " to bool, use default value: " + dft);
            return dft;


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">




        <!--skipTests会编译测试类,即生成.class文件,只是不运行测试类, 你可以手动运行测试类。-->


        <!--   单元测试 -->

        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->






