游戏项目1-1
游戏项目开始了
一、整体架构
二、采集数据
1、将flume部署在Windows上 2、保证数据不能重复 3、保证断点续传 4、定义拦截器,转换器 5、乱码问题 6、采集的数据落到Kafka中 |
三、学习flume
Flume.apache.org |
四、开始写代码
1、建一个工程
|
2、导入配置
<dependencies>
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.35</version> </dependency>
<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> <!-- 开发是会引用,但是打包不会包括 --> <scope>provided</scope> </dependency>
<dependency> <groupId>org.apache.flume.flume-ng-sources</groupId> <artifactId>flume-taildir-source</artifactId> <version>1.7.0</version> <scope>provided</scope> </dependency>
<dependency> <groupId>org.apache.flume.flume-ng-channels</groupId> <artifactId>flume-kafka-channel</artifactId> <version>1.7.0</version> <scope>provided</scope> </dependency>
</dependencies>
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build> |
3、创建java:TailFileSource
public class TailFileSource extends AbstractSource implements EventDrivenSource, Configurable { private static final Logger logger = LoggerFactory.getLogger(TailFileSource.class); private String filePath; private String posiFile; private String charset; private Long interval; private ExecutorService executor; private FileRunner fileRunner; /** * 根据context读取配置文件 * 执行时机:构造器之后,start方法之前会执行一次configure方法 * * @param context */ @Override public void configure(Context context) { //监听那个文件 filePath = context.getString("filePath"); //保存便宜量的文件 posiFile = context.getString("posiFile"); //读取文件的周期 interval = context.getLong("interval"); //字符集 charset = context.getString("charset", "UTF-8"); } /** * flume启动时时候会调用一次start方法 */ @Override public synchronized void start() { //创建一个单线程的线程池 executor = Executors.newSingleThreadExecutor(); fileRunner = new FileRunner(filePath, posiFile, charset, interval, getChannelProcessor()); executor.execute(fileRunner); super.start(); } /** * flume停止之前会调用stop方法 */ @Override public synchronized void stop() { fileRunner.setStarted(false); //关闭线程池 executor.shutdown(); while (!executor.isTerminated()) { logger.debug("Waiting for exec executor service to stop"); try { executor.awaitTermination(500, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { logger.debug("Interrupted while waiting for exec executor service " + "to stop. Just exiting."); Thread.currentThread().interrupt(); } } super.stop(); } private static class FileRunner implements Runnable { private boolean started = true; private String charset; private Long interval; private long offset = 0; private ChannelProcessor channelProcessor; private RandomAccessFile raf; private File positionFile; public FileRunner(String filePath, String posiFile, String charset, Long interval, ChannelProcessor channelProcessor) { this.charset = charset; this.interval = interval; this.channelProcessor = channelProcessor; try { //创建RandomAccessFile raf = new RandomAccessFile(filePath, "r"); //读取以前的偏移量 positionFile = new File(posiFile); if (!positionFile.exists()) { positionFile.createNewFile(); } //读取偏移量文件中的偏移量 String offsetStr = FileUtils.readFileToString(positionFile); if (offsetStr != null && !offsetStr.equals("")) { offset = Long.parseLong(offsetStr); raf.seek(offset); } } catch (FileNotFoundException e) { logger.error("init RandomAccessFile error", e); } catch (IOException e) { logger.error("create posifile error", e); } } @Override public void run() { while (started) { //读取文件是否有新内容 //跳到指定的偏移量 try { String line = raf.readLine(); if (line != null) { //乱码问题就是RandomAccessFile读取数据总是用ISO-8859-1编码来读的 line = new String(line.getBytes("ISO-8859-1"), charset); //有新内容,将新内容方法channel //将数据构建成Event发送给Channel channelProcessor.processEvent(EventBuilder.withBody(line.getBytes())); //获取到当前文件的偏移量,然后更新偏移量 offset = raf.getFilePointer(); //将偏移量信息记录到偏移量文件中 FileUtils.writeStringToFile(positionFile, offset + ""); } else { //睡觉(指定的时间间隔) Thread.sleep(interval); } } catch (InterruptedException e) { logger.error("read file thread exception", e); } catch (IOException e) { logger.error("seek to file exception", e); } } } public void setStarted(boolean started) { this.started = started; } } } |
4、Flume自定义source对接Kafka