

我想从谷歌的扳手数据库中读取表格,并将其写入文本文件做一个备份,使用谷歌数据流与python sdk。 我写了下面的脚本:从扳手读取谷歌数据流

from __future__ import absolute_import 

import argparse 
import itertools 
import logging 
import re 
import time 
import datetime as dt 
import apache_beam as beam 
from apache_beam.io import iobase 
from apache_beam.io import WriteToText 
from apache_beam.io.range_trackers import OffsetRangeTracker, UnsplittableRangeTracker 
from apache_beam.metrics import Metrics 
from apache_beam.options.pipeline_options import PipelineOptions 
from apache_beam.options.pipeline_options import StandardOptions, SetupOptions 
from apache_beam.options.pipeline_options import GoogleCloudOptions 

from google.cloud.spanner.client import Client 
from google.cloud.spanner.keyset import KeySet 

BUCKET_URL = 'gs://my_bucket' 
OUTPUT = '%s/output/' % BUCKET_URL 
PROJECT_ID = 'my_project' 
INSTANCE_ID = 'my_instance' 
DATABASE_ID = 'my_db' 
JOB_NAME = 'spanner-backup' 
TABLE = 'my_table' 

class SpannerSource(iobase.BoundedSource): 
    def __init__(self): 
    logging.info('Enter __init__') 

    self.spannerOptions = { 
     "id": PROJECT_ID, 
     "instance": INSTANCE_ID, 
     "database": DATABASE_ID 
    self.SpannerClient = Client 

    def estimate_size(self): 
    logging.info('Enter estimate_size') 
    return 1 

    def get_range_tracker(self, start_position=None, stop_position=None): 
    logging.info('Enter get_range_tracker') 
    if start_position is None: 
     start_position = 0 
    if stop_position is None: 
     stop_position = OffsetRangeTracker.OFFSET_INFINITY 

    range_tracker = OffsetRangeTracker(start_position, stop_position) 
    return UnsplittableRangeTracker(range_tracker) 

    def read(self, range_tracker): # This is not called when using the dataflowRunner ! 
    logging.info('Enter read') 
    # instantiate spanner client 
    spanner_client = self.SpannerClient(self.spannerOptions["id"]) 
    instance = spanner_client.instance(self.spannerOptions["instance"]) 
    database = instance.database(self.spannerOptions["database"]) 

    # read from table 
    table_fields = database.execute_sql("SELECT t.column_name FROM information_schema.columns AS t WHERE t.table_name = '%s'" % TABLE) 
    self.columns = [x[0] for x in table_fields] 
    keyset = KeySet(all_=True) 
    results = database.read(table=TABLE, columns=self.columns, keyset=keyset) 

    # iterator over rows 
    for row in results: 
     JSON_row = { 
     self.columns[i]: row[i] for i in range(len(self.columns)) 
     yield JSON_row 

    def split(self, start_position=None, stop_position=None): 
    # this should not be called since the source is unspittable 
    logging.info('Enter split') 
    if start_position is None: 
     start_position = 0 
    if stop_position is None: 
     stop_position = 1 

    # Because the source is unsplittable (for now), only a single source is returned 
    yield iobase.SourceBundle(

def run(argv=None): 
    """Main entry point""" 
    pipeline_options = PipelineOptions() 
    google_cloud_options = pipeline_options.view_as(GoogleCloudOptions) 
    google_cloud_options.project = PROJECT_ID 
    google_cloud_options.job_name = JOB_NAME 
    google_cloud_options.staging_location = '%s/staging' % BUCKET_URL 
    google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL 

    #pipeline_options.view_as(StandardOptions).runner = 'DirectRunner' 
    pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner' 
    p = beam.Pipeline(options=pipeline_options) 

    output = p | 'Get Rows from Spanner' >> beam.io.Read(SpannerSource()) 
    iso_datetime = dt.datetime.now().replace(microsecond=0).isoformat() 
    output | 'Store in GCS' >> WriteToText(file_path_prefix=OUTPUT + iso_datetime + '-' + TABLE, file_name_suffix='') # if this line is commented, job completes but does not do anything 

    result = p.run() 

if __name__ == '__main__': 


"Executing failure step failure14 [...] Workflow failed. Causes: [...] The worker lost contact with the service."

有时候,它会一直持续下去,不会创建输出。此外,如果我注释行'output = ...',作业就完成了,但是没有实际读取数据。


有没有人有什么想法可能会导致此? 我知道有一个(更完整)的java SDK,其中有一个实验性的扳手源/汇,但如果可能的话,我宁愿用python。



我们已经优先的Java数据流接口第一,我建议你使用ParDos使用Java或实现的Python连接器,见https://beam.apache.org/documentation/sdks/python-custom-io/ –


谢谢@MairbekKhadikov。我会暂时尝试ParDo的方式。 –

我已经返工我的代码的建议,简单地使用帕尔多之后,而不是使用BoundedSource类。作为参考,这里是我的解决方案;我相信有很多方法可以改进,我很乐意听取意见。 特别是我很惊讶,我有一个启动管道时,(如果我不这样做,我得到一个错误

AttributeError: 'PBegin' object has no attribute 'windowing'


from __future__ import absolute_import 

import datetime as dt 
import logging 

import apache_beam as beam 
from apache_beam.io import WriteToText 
from apache_beam.options.pipeline_options import PipelineOptions 
from apache_beam.options.pipeline_options import StandardOptions, SetupOptions 
from apache_beam.options.pipeline_options import GoogleCloudOptions 
from google.cloud.spanner.client import Client 
from google.cloud.spanner.keyset import KeySet 

BUCKET_URL = 'gs://my_bucket' 
OUTPUT = '%s/some_folder/' % BUCKET_URL 
PROJECT_ID = 'my_project' 
INSTANCE_ID = 'my_instance' 
DATABASE_ID = 'my_database' 
JOB_NAME = 'my_jobname' 

class ReadTables(beam.DoFn): 
    def __init__(self, project, instance, database): 
     super(ReadTables, self).__init__() 
     self._project = project 
     self._instance = instance 
     self._database = database 

    def process(self, element): 
     # get list of tables in the database 
     table_names_row = Client(self._project).instance(self._instance).database(self._database).execute_sql('SELECT t.table_name FROM information_schema.tables AS t') 
     for row in table_names_row: 
      if row[0] in [u'COLUMNS', u'INDEXES', u'INDEX_COLUMNS', u'SCHEMATA', u'TABLES']: # skip these 
      yield row[0] 

class ReadSpannerTable(beam.DoFn): 
    def __init__(self, project, instance, database): 
     super(ReadSpannerTable, self).__init__() 
     self._project = project 
     self._instance = instance 
     self._database = database 

    def process(self, element): 
     # first read the columns present in the table 
     table_fields = Client(self._project).instance(self._instance).database(self._database).execute_sql("SELECT t.column_name FROM information_schema.columns AS t WHERE t.table_name = '%s'" % element) 
     columns = [x[0] for x in table_fields] 

     # next, read the actual data in the table 
     keyset = KeySet(all_=True) 
     results_streamed_set = Client(self._project).instance(self._instance).database(self._database).read(table=element, columns=columns, keyset=keyset) 

     for row in results_streamed_set: 
      JSON_row = { columns[i]: row[i] for i in xrange(len(columns)) } 
      yield (element, JSON_row)   # output pairs of (table_name, data) 

def run(argv=None): 
    """Main entry point""" 
    pipeline_options = PipelineOptions() 
    pipeline_options.view_as(SetupOptions).save_main_session = True 
    pipeline_options.view_as(SetupOptions).requirements_file = "requirements.txt" 
    google_cloud_options = pipeline_options.view_as(GoogleCloudOptions) 
    google_cloud_options.project = PROJECT 
    google_cloud_options.job_name = JOB_NAME 
    google_cloud_options.staging_location = '%s/staging' % BUCKET_URL 
    google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL 

    pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner' 
    p = beam.Pipeline(options=pipeline_options) 

    init = p  | 'Begin pipeline'    >> beam.Create(["test"])             # have to create a dummy transform to initialize the pipeline, surely there is a better way ? 
    tables = init | 'Get tables from Spanner'  >> beam.ParDo(ReadTables(PROJECT, INSTANCE_ID, DATABASE_ID))   # read the tables in the db 
    rows = (tables | 'Get rows from Spanner table' >> beam.ParDo(ReadSpannerTable(PROJECT, INSTANCE_ID, DATABASE_ID)) # for each table, read the entries 
        | 'Group by table'    >> beam.GroupByKey() 
        | 'Formatting'     >> beam.Map(lambda (table_name, rows): (table_name, list(rows))))  # have to force to list here (dataflowRunner produces _Unwindowedvalues) 

    iso_datetime = dt.datetime.now().replace(microsecond=0).isoformat() 
    rows    | 'Store in GCS'    >> WriteToText(file_path_prefix=OUTPUT + iso_datetime, file_name_suffix='') 

    result = p.run() 

if __name__ == '__main__': 