Jmeter Kafka插件开发之Sampler篇

Jmeter Kafka插件开发之Sampler篇

背景:在项目中经常需要往kafka里面写数据,并拿生成的数据执行后续的操作,并且有些接口中也需要用到kafka里的数据,小编使用JMeter做接口测试,于是在网上找了下发现有一个插件是java sampler 界面不太友好,故萌发了写个JMeter Kafka插件的想法。

  界面如下:

Jmeter Kafka插件开发之Sampler篇

  Ps:小编公司对kafka进行了二次封装,不是直接以IP方式写数据,而是将ip反向代理到了域名,请求需要带上校验码才能请求,出于隐私考虑隐藏了部分输入框无关紧要的信息。

 

需要建一个gui包和一个sampler包,Jmeter在加载插件的时候会加载gui里面的类,可以参考JMeter源码的包命名方式,以下是项目目录结构:

Jmeter Kafka插件开发之Sampler篇

 

 

实现方式:

  1. 继承AbstractSamplerGui方法
  2. 重写createTestElement()方法:此方法应创建TestElement类的新实例,然后将其传递给modifyTestElement(TestElement) 方法
  3. 重写configure()方法:通过调用此方法,可以用测试元素对象的内容初始化新创建的GUI组件
  4. 重写modifyTestElement()方法:将GUI元素中的数据移动到TestElement中

主要重写2、3、4这3个方法,其他的包括重写clearGui()用于恢复GUI到初始状态,getStaticLabel()返回插件名称

5、引入ApacheJMeter_java、ApacheJMeter_core以及需要使用的其他库

GUI类主要负责输入数据然后将数据传送到TestElement中,然后sampler类拿到数据进行业务逻辑处理。

GUI代码如下:

package com.sf.jmeter.gui;

import com.sf.jmeter.sampler.SFKafkaSampler;
import org.apache.jmeter.gui.util.JSyntaxTextArea;
import org.apache.jmeter.gui.util.JTextScrollPane;
import org.apache.jmeter.gui.util.VerticalPanel;
import org.apache.jmeter.samplers.gui.AbstractSamplerGui;
import org.apache.jmeter.testelement.TestElement;
import org.apache.jorphan.gui.layout.VerticalLayout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.swing.*;
import java.awt.*;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;

public class SFKafkaSamplerUI extends AbstractSamplerGui implements ActionListener {
    private static final long serialVersionUID = 1L;
    private static final Logger log = LoggerFactory.getLogger(SFKafkaSamplerUI.class);

    private JLabel clusterAddressLable;
    private JTextField clusterAddressField;

    private JLabel clusterNameLabel;
    private JTextField clusterNameField;

    private JLabel topicNameLabel;
    private JTextField topicNameField;

    private JLabel chekCodeLabel;
    private JTextField chekCodeField;


    private JSyntaxTextArea textMessage = new JSyntaxTextArea(40, 50);

    private JLabel textArea = new JLabel();
    private JTextScrollPane textPanel = new JTextScrollPane(textMessage);

    private JLabel kafkaLabel;
    private JRadioButton readWriteMode;
    private JRadioButton writeMode;
    private JRadioButton readMode;


    Box clusterAddressPanel, clusterNamePanel, topicNamePanel, chekCodePanel, kafkaPanel;
    static Box filterPanel, readNumPanel;

    JPanel mainPanel = new VerticalPanel();
    JPanel extendPanel = new VerticalPanel();
    static JPanel contentPanel;


    public SFKafkaSamplerUI() {
        super();
        this.init();
    }

    //创建消息输入框
    private JPanel createContentPanel() {

        JPanel ContentPanel = new VerticalPanel();
        JPanel messageContentPanel = new JPanel(new BorderLayout());
        messageContentPanel.add(this.textArea, BorderLayout.NORTH);
        messageContentPanel.add(this.textPanel, BorderLayout.CENTER);
        ContentPanel.add(messageContentPanel);
        ContentPanel.setBorder(BorderFactory.createTitledBorder(BorderFactory.createLineBorder(Color.gray), "写入数据"));

        return ContentPanel;
    }

    private Box createTextFiledPanle(String name) {

        JLabel jLabel = new JLabel(name);
        JTextField jTextField = new JTextField(6);

        Box box = Box.createHorizontalBox();
        box.add(jLabel);
        box.add(jTextField);
        return box;
    }


    private void init() {
        log.info("初始化UI界面");

        clusterAddressLable = new JLabel("集群地址:");
        clusterAddressField = new JTextField(6);

        clusterNameLabel = new JLabel("集群名称:");
        clusterNameField = new JTextField(6);

        topicNameLabel = new JLabel("主题名称:");
        topicNameField = new JTextField(6);

        chekCodeLabel = new JLabel("校验码:");
        chekCodeField = new JTextField(6);

        kafkaLabel = new JLabel("kafka:");

        readWriteMode = new JRadioButton("读写模式");
        writeMode = new JRadioButton("写模式");
        readMode = new JRadioButton("读模式");

        kafkaPanel = Box.createHorizontalBox();

        kafkaPanel.add(kafkaLabel);
        kafkaPanel.add(readWriteMode);
        kafkaPanel.add(writeMode);
        kafkaPanel.add(readMode);

        //设置默认读写方式
        writeMode.setSelected(true);
        readWriteMode.setSelected(false);
        readMode.setSelected(false);

        contentPanel = createContentPanel();
        contentPanel.setVisible(true);
        filterPanel = createTextFiledPanle("筛选条件");

        readNumPanel = createTextFiledPanle("读取条数");
        setLayout(new VerticalLayout(5, VerticalLayout.BOTH, VerticalLayout.TOP));

        setBorder(makeBorder());
        add(makeTitlePanel());

        clusterAddressPanel = Box.createHorizontalBox();

        clusterAddressPanel.add(clusterAddressLable);
        clusterAddressPanel.add(clusterAddressField);

        clusterNamePanel = Box.createHorizontalBox();

        clusterNamePanel.add(clusterNameLabel);
        clusterNamePanel.add(clusterNameField);

        topicNamePanel = Box.createHorizontalBox();

        topicNamePanel.add(topicNameLabel);
        topicNamePanel.add(topicNameField);

        chekCodePanel = Box.createHorizontalBox();

        chekCodePanel.add(chekCodeLabel);
        chekCodePanel.add(chekCodeField);


        mainPanel.add(clusterAddressPanel);
        mainPanel.add(clusterNamePanel);
        mainPanel.add(topicNamePanel);
        mainPanel.add(chekCodePanel);
        mainPanel.add(kafkaPanel);

        extendPanel.add(contentPanel);
        extendPanel.add(filterPanel);
        extendPanel.add(readNumPanel);
        extendPanel.setVisible(true);
        add(mainPanel, BorderLayout.CENTER);
        add(extendPanel, BorderLayout.CENTER);


    }

    /**
     * 此方法应创建TestElement类的新实例,然后将其传递给modifyTestElement(TestElement) 方法
     *
     * @return
     */
    @Override
    public TestElement createTestElement() {
        SFKafkaSampler sampler = new SFKafkaSampler();
        modifyTestElement(sampler);
        return sampler;
    }

    @Override
    public void clearGui() {
        super.clearGui();
        clusterAddressField.setText("");
        clusterNameField.setText("");
        topicNameField.setText("");
        chekCodeField.setText("");


    }

    /**
     * 一定要调用super.configure(e)。这将为您填充一些数据,例如元素的名称。
     * 使用此方法将数据设置到GUI元素中
     * 通过调用此方法,可以用测试元素对象的内容初始化新创建的GUI组件。组件负责查询测试元素对象,以获取要在其GUI中显示的相关信息。
     *
     * @param element
     */
    @Override
    public void configure(TestElement element) {
        super.configure(element);

        SFKafkaSampler sampler = (SFKafkaSampler) element;
        clusterAddressField.setText(sampler.getClusterAddress());
        log.info("设置集群地址为:" + sampler.getClusterAddress());
        clusterNameField.setText(sampler.getClusterName());
        log.info("设置集群名称为:" + sampler.getClusterName());

        topicNameField.setText(sampler.getTopicName());
        log.info("设置集群主题为:" + sampler.getTopicName());

        chekCodeField.setText(sampler.getChekCode());
        log.info("设置校验码为:" + sampler.getChekCode());

        textMessage.setText(sampler.getMessage());
        log.info("设置发送消息为:" + sampler.getMessage());

    }


    @Override
    public String getStaticLabel() {
        return "SFKafka Sampler";
    }

    @Override
    public String getLabelResource() {
        throw new IllegalStateException("This shouldn't be called");
    }

    /**
     * 将GUI元素中的数据移动到TestElement,
     *
     * @param e
     */
    @Override
    public void modifyTestElement(TestElement e) {
        e.clear();
        // 调用super.configureTestElement(e)。这将处理一些默认数据
        configureTestElement(e);

        SFKafkaSampler sampler = new SFKafkaSampler();

        ((SFKafkaSampler) e).setClusterAddress(clusterAddressField.getText());
        log.info("填入的集群地址为:" + this.clusterAddressField.getText());

        ((SFKafkaSampler) e).setClusterName(clusterNameField.getText());
        log.info("填入的集群地址为:" + this.clusterAddressField.getText());

        ((SFKafkaSampler) e).setTopicName(topicNameField.getText());
        log.info("填入的主题名称地址为:" + this.topicNameField.getText());

        ((SFKafkaSampler) e).setMessage(textMessage.getText());
        log.info("填入的主题名称地址为:" + this.topicNameField.getText());

        ((SFKafkaSampler) e).setChekCode(chekCodeField.getText());
        log.info("填入的校验码为:" + this.chekCodeField.getText());

        ((SFKafkaSampler) e).setTopicTokens(topicNameField.getText() + ":" + chekCodeField.getText());
        log.info("填入的校验码为:" + sampler.getTopicTokens());

        //添加监听
        readWriteMode.addActionListener(this);
        readMode.addActionListener(this);
        writeMode.addActionListener(this);


    }

    @Override
    public void actionPerformed(ActionEvent e) {
        if (e.getSource() == readWriteMode) {
            readWriteMode.setSelected(true);
            readMode.setSelected(false);
            writeMode.setSelected(false);
            log.info("读写模式新增contentPanel");

            contentPanel.setVisible(true);

            log.info("读写模式新增filterPanel");

            filterPanel.setVisible(true);

            log.info("读写模式新增readNumPanel");

            readNumPanel.setVisible(true);


            extendPanel.setVisible(true);
            updateUI();
            repaint();

        }
        if (e.getSource() == writeMode) {

            writeMode.setSelected(true);
            readWriteMode.setSelected(false);
            readMode.setSelected(false);

            log.info("写模式新增contentPanel");
            contentPanel.setVisible(true);

            filterPanel.setVisible(false);

            readNumPanel.setVisible(false);

            extendPanel.setVisible(false);
            extendPanel.setVisible(true);

            updateUI();
            repaint();


        }
        if (e.getSource() == readMode) {
            readMode.setSelected(true);
            writeMode.setSelected(false);
            readWriteMode.setSelected(false);
            log.info("读模式新增filterPanel");

            contentPanel.setVisible(false);
            filterPanel.setVisible(true);

            log.info("读模式新增readNumPanel");
            readNumPanel.setVisible(true);

            extendPanel.setVisible(true);

            updateUI();
            repaint();

        }

    }
}



sampler代码如下

package com.sf.jmeter.sampler;





import com.sf.kafka.api.produce.ProduceConfig;

import com.sf.kafka.api.produce.IKafkaProducer;



import com.sf.kafka.api.produce.ProducerPool;

import org.apache.jmeter.samplers.AbstractSampler;

import org.apache.jmeter.samplers.Entry;

import org.apache.jmeter.samplers.SampleResult;

import org.apache.jmeter.testelement.TestElement;



public class SFKafkaSampler extends AbstractSampler implements TestElement {

    private long serialVersionUID = 1L;



    public static final String CLUSTERADDRESS = "SFKafkaSampler.clusterAddress";

    public static final String CLUSTERNAME = "SFKafkaSampler.clusterName";

    public static final String TOPICNAME = "SFKafkaSampler.topicName";

    public static final String CHEKCODE = "SFKafkaSampler.chekCode";

    public static final String TOPICTOKENS = "SFKafkaSampler.topicTokens";

    public static final String MESSAGE = "SFKafkaSampler.message";



//    private String clusterAddress = "";

//    private String clusterName = "";

//    private String topicName = "";

//    private String chekCode = "";

//    private String topicTokens = topicName + ":" + chekCode;

//    private String message = "";



    public void setClusterAddress(String clusterAddress) {

        setProperty(CLUSTERADDRESS, clusterAddress);

    }



    public void setClusterName(String clusterName) {

        setProperty(CLUSTERNAME, clusterName);

    }



    public void setTopicName(String topicName) {

        setProperty(TOPICNAME, topicName);

    }



    public void setChekCode(String chekCode) {

        setProperty(CHEKCODE, chekCode);

    }



    public void setTopicTokens(String topicTokens) {

        setProperty(TOPICTOKENS, topicTokens);

    }



    public void setMessage(String message) {



        setProperty(MESSAGE, message);

    }





    public String getTopicTokens() {

        return getPropertyAsString(TOPICTOKENS);

    }



    public String getMessage() {

        return getPropertyAsString(MESSAGE);

    }



    public String getClusterAddress() {

        return getPropertyAsString(CLUSTERADDRESS);

    }



    public String getClusterName() {

        return getPropertyAsString(CLUSTERNAME);

    }



    public String getTopicName() {

        return getPropertyAsString(TOPICNAME);

    }



    public String getChekCode() {

        return getPropertyAsString(CHEKCODE);

    }





    public SFKafkaSampler() {

        setName("SFKafka Sampler");

    }



    @Override

    public SampleResult sample(Entry entry) {

        SampleResult result = new SampleResult();

        result.setSampleLabel(getName());

        try {

            result.sampleStart();

            //写入业务数据



            ProduceConfig produceConfig = new ProduceConfig(5, getClusterAddress(), getClusterName(), getTopicTokens());

            IKafkaProducer kafkaProducer = new ProducerPool(produceConfig);



            kafkaProducer.sendString(getTopicName(), getMessage());



            if (kafkaProducer != null) {

                kafkaProducer.close();

            }





            result.setSamplerData("请求集群地址为:" + getClusterAddress() + "\n" + "请求集群主题为:" + getTopicName() + "\n" + "请求token为:" + getTopicTokens() + "\n" + "请求消息为:" + getMessage());



            result.setResponseData(getMessage(), "utf-8");





            result.sampleEnd();

            result.setSuccessful(true);

            result.setResponseCodeOK();



        } catch (Exception e) {

            result.setSamplerData("请求集群地址为:" + getClusterAddress() + "\n" + "请求集群主题为:" + getTopicName() + "\n" + "请求token为:" + getTopicTokens() + "\n" + "请求消息为:" + getMessage());



            result.sampleEnd(); // stop stopwatch

            result.setSuccessful(false);

            result.setResponseMessage("Exception: " + e);

            // get stack trace as a String to return as document data

            java.io.StringWriter stringWriter = new java.io.StringWriter();

            e.printStackTrace(new java.io.PrintWriter(stringWriter));

            result.setResponseData(stringWriter.toString(), null);

            result.setDataType(org.apache.jmeter.samplers.SampleResult.TEXT);

            result.setResponseCode("FAILED");

        }

        return result;

    }



}


 

然后用maven打完包就可以放到Jmeter/lib/ext目录下,重启JMeter后新建Sampler可以查看到自己定义的SFKafka Sampler如下图:

Jmeter Kafka插件开发之Sampler篇

 

界面如下:

Jmeter Kafka插件开发之Sampler篇

 

测试一下是否成功

Jmeter Kafka插件开发之Sampler篇

 

经测试数据是成功写入到对应的kafka主题中了,到此就完成了Kafka插件的写入功能开发

 

 

参考文档:

  1. Jmeter 源码examples
  2. Jmeter定时器固定定时器源码
  3. https://github.com/BrightTag/kafkameter
  4. http://jmeter.apache.org/usermanual/jmeter_tutorial.html