扩展:将CSV转换为json

通过nifi将csv轻松转化为json。
  • 总体流程图:
扩展:将CSV转换为json
  • 下面是流程中使用的CSV文件:
文件名为:users.csv
内容为:
扩展:将CSV转换为json
路径为:/opt
  • 查看并启动控制器服务
从操作板中选择齿轮图标,进入控制服务器:
扩展:将CSV转换为json
这将打开NiFi流程配置窗口。 选择Controller services选项卡:
扩展:将CSV转换为json
通过选择闪电图标/按钮启用AvroSchemaRegistry。 这将允许您启用CSVReader和JSONRecordSetWriter控制器服务。 选择这两个服务的闪电图标。 所有的控制器服务应该在这个时候启用:
扩展:将CSV转换为json
  • 配置控制器服务
首先,点击添加按钮:
扩展:将CSV转换为json
进入控制器添加页面:
扩展:将CSV转换为json
搜索并选择AvroSchemaRegistry添加
扩展:将CSV转换为json
选择小齿轮,进行配置:
为了理解数据,record reader和record writer需要知道与数据关联的模式。一些record reader(例如,Avro Reader)允许从数据本身读取模式。该模式也可以作为FlowFile属性包含在内。但大多数情况下,它都将通过Schema Registry中的名称查询。在这个版本的NiFi中,存在两个Schema Registry实现:基于Avro的Schema Registry服务和用于外部Hortonworks Schema Registry的客户端。
扩展:将CSV转换为json
CSVReader Controller Service
扩展:将CSV转换为json
JsonRecordSetWriter Controller Service
扩展:将CSV转换为json
  • 查看每个流程的详情(属性,内容)
右键要查的processor:
扩展:将CSV转换为json
进入nifi这个processor执行过 的流程:
扩展:将CSV转换为json
点击小图片进入每次执行的详情:
扩展:将CSV转换为json
下面有三个选项卡:分别是详情,属性,内容
扩展:将CSV转换为json

  • 以下是流程的简要概述:
1. GetFile从本地目录中获取用户数据的CSV文件
2. UpdateAttribute将 Schema Name ="users"作为属性添加到流文件中
3. ConvertRecord通过以下方式将流文件内容从CSV转换为JSON:
使用引用AvroSchemaRegistry控制器服务中的模式的CSVReader控制器服务
AvroSchemaRegistry包含一个名为uses的属性,它定义了关于每个记录的信息(字段名称,字段ID,字段类型)
使用引用相同的AvroSchemaRegistry模式的JsonRecordSetWriter控制器服务
4. UpdateAttribute将带有JSON扩展名的文件名作为属性添加到流文件中
5. PutFile将流文件的内容写入本地目录

 1.GetFile Processor获取文件

扩展:将CSV转换为json
2.Add Schema Name Attribute (UpdateAttribute Processor)
流中的下一步是UpdateAttribute处理器,它将schema.name属性的值为“users”添加到流文件中:
扩展:将CSV转换为json
3.ConvertRecord - CSVtoJSON (ConvertRecord Processor)
下一个处理器是ConvertRecord。 看看它的配置,只有两个属性:
记录读取器设置为“CSVReader”,记录写入器设置为“JsonRecordSetWriter”。 
“CSVReader”控制器服务解析传入的CSV数据并确定数据的模式。 
“JsonRecordSetWriter”控制器服务确定数据的模式并将数据写入JSON。
扩展:将CSV转换为json
4.Add JSON File Name Extension (UpdateAttribute Processor)
下一个处理器是另一个UpdateAttribute,它只是将一个JSON扩展添加到原始CSV文件的名称中:
扩展:将CSV转换为json
5.PutFile Processor
最后一个处理器将JSON格式文件放在本地目录中
扩展:将CSV转换为json