用Apache Flink将数据推送到S3
问题描述:
我有一个小型测试项目来将数据推送到S3存储桶。但是,它看起来像我没有读取core-site.xml文件,因为我收到错误java.io.IOException: No file system found with scheme s3a
。我如何正确读取core-site.xml文件并将数据推送到S3?用Apache Flink将数据推送到S3
这是代码:
public class S3Sink {
public static void main(String[] args) throws Exception {
Map<String, String> configs = ConfigUtils.loadConfigs(“path/to/config.yaml");
final ParameterTool parameterTool = ParameterTool.fromMap(configs);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setGlobalJobParameters(parameterTool);
DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer09<String>(
parameterTool.getRequired("kafka.topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
String id = UUID.randomUUID().toString();
messageStream.writeAsText("s3a://flink-test/" + id + ".txt").setParallelism(1);
env.execute();
}
这是弗林克-conf.yaml文件中的配置变化来引用核心site.xml文件:
fs.hdfs.hadoopconf: /path/to/core-site/etc/hadoop/
这是我的核心-site.xml:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
<property>
<name>fs.s3a.awsAccessKeyId</name>
<value>*****</value>
</property>
<!-- set your AWS access key -->
<property>
<name>fs.s3a.awsSecretAccessKey</name>
<value>*****</value>
</property>
答
core-site.xml文件未被读入的原因是由于Hadoop的文件结构。我有HADOOP_HOME=path/to/dir/etc/hadoop
。但是,Hadoop会将etc/hadoop作为其文件结构的一部分来查找core-site.xml。要在HADOOP_HOME环境变量中正确读取路径,应该将其列为HADOOP_HOME=path/to/dir
。
另一个问题是数据没有推到S3的原因。这是因为我正在使用流处理。批量处理的作用是将数据推送到S3,但流处理并不是因为S3如何将数据存储为键/值存储,并且新数据只能被替换。对于流处理,Flink会将数据附加到S3不允许的数据上,因此不会将数据推送到S3。所以这段代码适用于将批量推送到S3
ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment();
DataSet dataSet = ee.readTextFile("/Users/name/Desktop/flinkoutputtest.txt");
dataSet.writeAsText("s3://flink-test/flink-output/testdoc.txt").setParallelism(1);
ee.execute();
如果将'fs.hdfs.hadoopconf'设置为连续'core-site.xml'文件夹,那么它会起作用吗?还要确保'$ HADOOP_HOME'环境变量设置正确。 –
我正在使用IntelliJ并将环境变量HADOOP_HOME设置为core-site.xml路径。我在本地运行程序,因此fs.hdfs.hadoopconf设置不起作用。 – Sam