在java程序中获取错误写入Kinesis Firehose流

问题描述:

我试图从API(谷歌股票/金融API)将一些数据写入我的AWS Firehose流。我已经在Eclipse上下载并安装了AWS插件,在AWS上设置了我的Firehose流,并且似乎所有内容都已正确安装。但是,我遇到了一些问题。下面的代码似乎不推荐使用...我尝试了不同的版本,但是我似乎无法获得正确的代码。在java程序中获取错误写入Kinesis Firehose流

AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient(credentials);

接下来,我收到以下错误。具体错误是,“即使我直接从Amazon的API参考中直接获取,方法setRecord(Record)对于PutRecordRequest类型”“未定义。

request.setRecord(record);

firehoseClient.putRecord(request);

上面的第二线还得到一个错误:在类型AmazonKinesisFirehoseClient “的方法putRecord(com.amazonaws.services.kinesisfirehose.model.PutRecordRequest)是不适用的参数(com.amazonaws。 services.kinesis.model.PutRecordRequest)”

package com.amazonaws.samples; 
import java.io.BufferedReader; 
import java.io.InputStreamReader; 
import java.net.HttpURLConnection; 
import java.net.URL; 
import java.nio.ByteBuffer; 

import org.apache.http.client.CredentialsProvider; 

import com.amazonaws.*; 
import com.amazonaws.AmazonClientException; 
import com.amazonaws.auth.AWSCredentials; 
import com.amazonaws.auth.AWSCredentialsProvider; 
import com.amazonaws.auth.profile.ProfileCredentialsProvider; 
import com.amazonaws.client.builder.AwsClientBuilder; 
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; 
import com.amazonaws.services.kinesis.AmazonKinesis; 
import com.amazonaws.services.kinesis.AmazonKinesisClient; 
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; 
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; 
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; 
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; 
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; 
import com.amazonaws.services.kinesis.model.PutRecordRequest; 
import com.amazonaws.services.kinesis.model.ResourceNotFoundException; 
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; 
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest; 
import com.amazonaws.services.kinesisfirehose.model.Record; 

public class FirehoseExample { 

    public static void main(String[] args) { 
     AWSCredentials credentials = null; 

     try { 
      credentials = new ProfileCredentialsProvider().getCredentials(); 
     } 

     catch (Exception e) { 
      throw new AmazonClientException("Cannot load the credentials from the credential profiles file. " 
        + "Please make sure that your credentials file is at the correct " 
        + "location (/Users/elybenari/.aws/credentials), and is in valid format.", e); 
     } 

     AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient(credentials); 
     PutRecordRequest request = new PutRecordRequest(); 
     request.setStreamName("project-stream"); 

     Record record = new Record(); 

     for (int i = 0; i < 20*60; i++){ 
      try { 
       URL url = new URL("https://www.google.com/finance/info?q=NASDAQ:AMZN"); 
       HttpURLConnection conn = (HttpURLConnection) url.openConnection(); 
       BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream())); 
       StringBuilder response = new StringBuilder(); 
       String line; 

       while ((line = reader.readLine()) != null) { 
        response.append(line); 
       } 
       reader.close(); 

       System.out.println(response.toString().replace("\n", "").replaceAll(" ", "")); 
       System.out.println("****\n"); 

       ByteBuffer buffer = ByteBuffer.wrap(response.toString().replace("\n", "").replaceAll(" ", "").getBytes()); 
       record.setData(buff); 

       request.setRecord(record); 

       firehoseClient.putRecord(request); 

       Thread.sleep(2000); 


      } 
      catch(Exception e){ 
       e.printStackTrace(); 
      } 
     } 

    } 




    } 

的问题是,你已经有来自室壁运动一些类,室壁运动流水,Java包。对于例如,你使用:

import com.amazonaws.services.kinesis.model.PutRecordRequest; 

然而,你应该已经使用:

import com.amazonaws.services.kinesisfirehose.model.PutRecordRequest; 

室壁运动,室壁运动流水和室壁运动分析是不同的服务,即使他们属于流媒体的一把伞AWS上的服务。因此,它们在Java SDK中具有不同的包名称空间。如果您从官方文档here开始,您将获得正确的Java SDK参考here

编辑:为了回答您的其他问题:是的,下面是弃用:

AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient(credentials); 

你应该使用下列内容:

AmazonKinesisFirehoseClient firehoseClient = AmazonKinesisFirehoseClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(awsCredentials)).build(); 

请参考官方文档here如何正确初始化AmazonKinesisFirehoseClient。

+0

谢谢...这是非常有帮助的!最后一个问题是,你知道写出这条线是正确的还是非直接的吗? AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient(credentials); – user2411290

+0

更新了答案。如果有帮助,不要忘记*接受*答案! –

+0

感谢您的回复......当我添加完全按照原样添加的代码时(除了更改凭据外),出现以下错误:“类型不匹配:无法从AmazonKinesisFirehose转换为AmazonKinesisFirehoseClient” – user2411290