如何在Spark中测试不可序列化的代码
问题描述:
我有一个Spark代码发送请求到DynamoDB。用于与数据库建立连接的AmazonDynamoDBClient不可序列化。如何在Spark中测试不可序列化的代码
所以我在斯卡拉使这一类的实例mapPartition
方法中这样的 - >
recordsToWrite.mapPartitions { iter =>
var credentials = new BasicAWSCredentials(awsAccess, awsSecret)
var client= new AmazonDynamoDBClient(credentials)
var dynamoDB=new DynamoDB(client)
var optTable=dynamoDB.getTable(tableName)
iter.map { x =>
//some code....
optTable.updateItem(x)
}
}
的问题是我想测试此代码与当地的火花(火花试验基地)和dynamodb在单元测试中。
我不能拿AmazonDynamoDBClient
出mapPartition
因为它不是序列化(异常是由火花抛出)
答
您可以创建一个DynamoDBFactory
特点,其是序列化,有两种实现方式,一种“真正的”一个和“测试”一个(我假设的问题是如何“注入”的测试客户端):
trait DynamoDBFactory extends Serializable {
def createClient(awsAccess: String, awsSecret: String): DynamoDB
}
class RealDynamoDBFactory extends DynamoDBFactory {
def createClient(awsAccess: String, awsSecret: String): DynamoDB = {
var credentials = new BasicAWSCredentials(awsAccess, awsSecret)
var client= new AmazonDynamoDBClient(credentials)
new DynamoDB(client)
}
}
class TestDynamoDBFactory extends DynamoDBFactory {
def createClient(awsAccess: String, awsSecret: String): DynamoDB = {
// return your test stub/mock/whatever you need
}
}
然后,有你的测试代码期望的DynamoDBFactory
一个实例,并通过正确的实例在测试/生产代码:
val dynamoDBFactory: DynamoDBFactory = // ...get it from caller
recordsToWrite.mapPartitions { iter =>
var dynamoDB = dynamoDBFactory.createClient(awsAccess, awsSecret)
var optTable=dynamoDB.getTable(tableName)
iter.map { x =>
//some code....
optTable.updateItem(x)
}
}
不得不做小的修改,但它的工作。谢谢 :) – cmbendre