Description
Hi
TF Analyze and Transform step fails when executed on DataFlow Runner . The step completes successfully when the pipeline is run on Direct Runner. For both Direct Run and Data flow Run, I have used the exact same train.csv file hosted in GCS.
Here is the code for a very minimal pre-processing:
`# Preprocessing function
def preprocessing_fn(inputs):
import tensorflow as tf
SELECTED_NUMERIC_FEATURE_KEYS = ['feature_1','feature_2','feature_3','feature_4','feature_5',
'feature_6','feature_7','feature_8','feature_9','feature_10','feature_11',
'feature_12','feature_13','feature_14','feature_15','feature_16','feature_17',
'feature_18','feature_19','feature_20','feature_21','feature_22',
'feature_23','feature_24','feature_25','feature_26','feature_27', 'feature_28']
LABEL_KEY = 'label'
outputs = dict()
for key in SELECTED_NUMERIC_FEATURE_KEYS:
outputs[key] = inputs[key]
outputs[LABEL_KEY] = tf.dtypes.cast(inputs[LABEL_KEY], tf.dtypes.int64)
return outputs
def encode_data(batch,temp):
from tfx_bsl.coders.example_coder import RecordBatchToExamples
return RecordBatchToExamples(batch)
pipeline code
def transform_data(train_data_file,
test_data_file,
transformed_train_dataset_location,
transformed_test_dataset_location,
transform_func_location,
args):
runner = args['runner']
temp_dir = args.pop('temp_dir')
input_schema = args.pop('input_schema')
ordered_columns = args.pop('ordered_columns')
input_schema = tft.DatasetMetadata.from_feature_spec(raw_data_feature_spec).schema
params2str = lambda key, value: f"--{key}={value}" if value else f"--{key}"
options = []
for (key, value) in args.items():
options.append(params2str(key, value))
print(options)
pipeline_options = PipelineOptions(options)
with beam.Pipeline(options=pipeline_options) as pipeline:
with tft_beam.Context(temp_dir=temp_dir):
train_csv_tfxio = tfxio.CsvTFXIO(
file_pattern=train_data_file,
skip_header_lines=1,
column_names=ordered_columns,
schema=input_schema)
raw_data = (
pipeline |
'ReadTrainCsv' >> train_csv_tfxio.BeamSource()
)
raw_dataset = (raw_data, train_csv_tfxio.TensorAdapterConfig())
transformed_dataset, transform_fn = (
raw_dataset |
tft_beam.AnalyzeAndTransformDataset(preprocessing_fn, output_record_batches=True)
)
transformed_data, _ = transformed_dataset
_ = ( transformed_data
| 'EncodeTrainData' >> beam.FlatMapTuple(encode_data)
| 'WriteTrainData' >> beam.io.WriteToTFRecord(transformed_train_dataset_location)
)
# Apply transform function to test data.
test_csv_tfxio = tfxio.CsvTFXIO(
file_pattern=test_data_file,
skip_header_lines=1,
column_names=ordered_columns,
schema=input_schema
)
raw_test_data = (
pipeline |
'ReadTestCsv' >> test_csv_tfxio.BeamSource()
)
raw_test_dataset = (raw_test_data, test_csv_tfxio.TensorAdapterConfig())
transformed_test_dataset = (
(raw_test_dataset, transform_fn)
| tft_beam.TransformDataset(output_record_batches=True)
)
transformed_test_data, _ = transformed_test_dataset
_ = (transformed_test_data
| 'EncodeTestData' >> beam.FlatMapTuple(encode_data)
| 'WriteTestData' >> beam.io.WriteToTFRecord(transformed_test_dataset_location)
)
_ = (
transform_fn
| 'WriteTransformFn' >> tft_beam.WriteTransformFn(transform_func_location)
)`
Error Encountered:
`ERROR:apache_beam.runners.dataflow.dataflow_runner:Console URL: https://console.cloud.google.com/dataflow/jobs//2022-12-31_00_37_41-4620021112150654208?project=
Traceback (most recent call last):
File "/Users/santoshsivapurapu/Documents/Projects/0_Sandbox/predict-higgs-boson-collision-events/Notebooks/utils/transformer.py", line 211, in
transform_data(train_data_file=TRAIN_DATASET_URI,
File "/Users/santoshsivapurapu/Documents/Projects/0_Sandbox/predict-higgs-boson-collision-events/Notebooks/utils/transformer.py", line 178, in transform_data
_ = (
File "/Users/santoshsivapurapu/opt/anaconda3/envs/tfx/lib/python3.9/site-packages/apache_beam/pipeline.py", line 598, in exit
self.result.wait_until_finish()
File "/Users/santoshsivapurapu/opt/anaconda3/envs/tfx/lib/python3.9/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1673, in wait_until_finish
raise DataflowRuntimeException(
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 837, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 981, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 321, in apache_beam.runners.worker.operations.GeneralPurposeConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 198, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
File "apache_beam/runners/worker/opcounters.py", line 215, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
File "apache_beam/runners/worker/opcounters.py", line 267, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
File "apache_beam/coders/coder_impl.py", line 1471, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1482, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 207, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 950, in apache_beam.coders.coder_impl.VarIntCoderImpl.estimate_size
File "apache_beam/coders/stream.pyx", line 234, in apache_beam.coders.stream.get_varint_size
TypeError: an integer is required
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
response = task()
File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 837, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 981, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 837, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 981, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 321, in apache_beam.runners.worker.operations.GeneralPurposeConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 198, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
File "apache_beam/runners/worker/opcounters.py", line 215, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
File "apache_beam/runners/worker/opcounters.py", line 267, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
File "apache_beam/coders/coder_impl.py", line 1471, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1482, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 207, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 950, in apache_beam.coders.coder_impl.VarIntCoderImpl.estimate_size
File "apache_beam/coders/stream.pyx", line 234, in apache_beam.coders.stream.get_varint_size
TypeError: an integer is required [while running 'AnalyzeAndTransformDataset/AnalyzeDataset/InstrumentInputBytes[AnalysisFlattenedPColl]/IncrementCounter/IncrementCounter-ptransform-35']`
Version info:
tensorflow-2.8.4
tensorflow-transform-1.8.0
tensorflow_data_validation-1.8.0
tfx-bsl-1.8.0
Python runtime version: Python 3.9.14
Apache Beam SDK 2.42.0