如何在Spark中测试不可序列化的代码

问题描述:

我有一个Spark代码发送请求到DynamoDB。用于与数据库建立连接的AmazonDynamoDBClient不可序列化。如何在Spark中测试不可序列化的代码

所以我在斯卡拉使这一类的实例mapPartition方法中这样的 - >

recordsToWrite.mapPartitions { iter => 
    var credentials = new BasicAWSCredentials(awsAccess, awsSecret) 
    var client= new AmazonDynamoDBClient(credentials) 
    var dynamoDB=new DynamoDB(client) 
    var optTable=dynamoDB.getTable(tableName) 
    iter.map { x => 
    //some code.... 
    optTable.updateItem(x) 
    } 
} 

的问题是我想测试此代码与当地的火花(火花试验基地)和dynamodb在单元测试中。

我不能拿AmazonDynamoDBClientmapPartition因为它不是序列化(异常是由火花抛出)

您可以创建一个DynamoDBFactory特点,其序列化,有两种实现方式,一种“真正的”一个和“测试”一个(我假设的问题是如何“注入”的测试客户端):

trait DynamoDBFactory extends Serializable { 
    def createClient(awsAccess: String, awsSecret: String): DynamoDB 
} 

class RealDynamoDBFactory extends DynamoDBFactory { 
    def createClient(awsAccess: String, awsSecret: String): DynamoDB = { 
    var credentials = new BasicAWSCredentials(awsAccess, awsSecret) 
    var client= new AmazonDynamoDBClient(credentials) 
    new DynamoDB(client) 
    } 
} 

class TestDynamoDBFactory extends DynamoDBFactory { 
    def createClient(awsAccess: String, awsSecret: String): DynamoDB = { 
    // return your test stub/mock/whatever you need 
    } 
} 

然后,有你的测试代码期望的DynamoDBFactory一个实例,并通过正确的实例在测试/生产代码:

val dynamoDBFactory: DynamoDBFactory = // ...get it from caller 
recordsToWrite.mapPartitions { iter => 
    var dynamoDB = dynamoDBFactory.createClient(awsAccess, awsSecret) 
    var optTable=dynamoDB.getTable(tableName) 
    iter.map { x => 
    //some code.... 
    optTable.updateItem(x) 
    } 
} 
+0

不得不做小的修改,但它的工作。谢谢 :) – cmbendre