diff --git a/.doc_gen/metadata/s3-control_metadata.yaml b/.doc_gen/metadata/s3-control_metadata.yaml index 5b34a0a4eac..d7ffd4f54ed 100644 --- a/.doc_gen/metadata/s3-control_metadata.yaml +++ b/.doc_gen/metadata/s3-control_metadata.yaml @@ -13,6 +13,15 @@ s3-control_Hello: - description: snippet_tags: - s3control.java2.list_jobs.main + Python: + versions: + - sdk_version: 3 + github: python/example_code/s3/scenarios/batch + sdkguide: + excerpts: + - description: + snippet_tags: + - python.example_code.s3control.list_jobs services: s3-control: {ListJobs} @@ -29,15 +38,24 @@ s3-control_CreateJob: - s3control.java2.create_job.async.main - description: Create a compliance retention job. snippet_tags: - - s3control.java2.create_job.compliance.main + - s3control.java2.create_job.compliance.main - description: Create a legal hold off job. snippet_tags: - - s3control.java2.create_job.compliance.main + - s3control.java2.create_job.compliance.main - description: Create a new governance retention job. snippet_tags: - - s3.java2.create_governance_retemtion.main + - s3.java2.create_governance_retemtion.main + Python: + versions: + - sdk_version: 3 + github: python/example_code/s3/scenarios/batch + sdkguide: + excerpts: + - description: + snippet_tags: + - python.example_code.s3control.create_job services: - s3-control: {CreateJob} + s3-control: {CreateJob} s3-control_PutJobTagging: languages: Java: @@ -49,8 +67,17 @@ s3-control_PutJobTagging: - description: snippet_tags: - s3control.java2.job.put.tags.main + Python: + versions: + - sdk_version: 3 + github: python/example_code/s3/scenarios/batch + sdkguide: + excerpts: + - description: + snippet_tags: + - python.example_code.s3control.put_job_tagging services: - s3-control: {PutJobTagging} + s3-control: {PutJobTagging} s3-control_DescribeJob: languages: Java: @@ -62,8 +89,17 @@ s3-control_DescribeJob: - description: snippet_tags: - s3control.java2.describe_job.main + Python: + versions: + - sdk_version: 3 + github: python/example_code/s3/scenarios/batch + sdkguide: + excerpts: + - description: + snippet_tags: + - python.example_code.s3control.describe_job services: - s3-control: {DescribeJob} + s3-control: {DescribeJob} s3-control_DeleteJobTagging: languages: Java: @@ -75,8 +111,17 @@ s3-control_DeleteJobTagging: - description: snippet_tags: - s3control.java2.del_job_tagging.main + Python: + versions: + - sdk_version: 3 + github: python/example_code/s3/scenarios/batch + sdkguide: + excerpts: + - description: + snippet_tags: + - python.example_code.s3control.delete_job_tagging services: - s3-control: {DeleteJobTagging} + s3-control: {DeleteJobTagging} s3-control_GetJobTagging: languages: Java: @@ -88,8 +133,17 @@ s3-control_GetJobTagging: - description: snippet_tags: - s3control.java2.get_job_tagging.main + Python: + versions: + - sdk_version: 3 + github: python/example_code/s3/scenarios/batch + sdkguide: + excerpts: + - description: + snippet_tags: + - python.example_code.s3control.get_job_tagging services: - s3-control: {GetJobTagging} + s3-control: {GetJobTagging} s3-control_UpdateJobStatus: languages: Java: @@ -101,8 +155,17 @@ s3-control_UpdateJobStatus: - description: snippet_tags: - s3control.java2.cancel_job.main + Python: + versions: + - sdk_version: 3 + github: python/example_code/s3/scenarios/batch + sdkguide: + excerpts: + - description: + snippet_tags: + - python.example_code.s3control.update_job_status services: - s3-control: {UpdateJobStatus} + s3-control: {UpdateJobStatus} s3-control_UpdateJobPriority: languages: Java: @@ -114,8 +177,17 @@ s3-control_UpdateJobPriority: - description: snippet_tags: - s3control.java2.update_job.main + Python: + versions: + - sdk_version: 3 + github: python/example_code/s3/scenarios/batch + sdkguide: + excerpts: + - description: + snippet_tags: + - python.example_code.s3control.update_job_priority services: - s3-control: {UpdateJobPriority} + s3-control: {UpdateJobPriority} s3-control_Basics: synopsis: learn core operations for &S3Control;. category: Basics @@ -132,5 +204,24 @@ s3-control_Basics: - description: An action class that wraps operations. snippet_tags: - s3control.java2.job.actions.main + Python: + versions: + - sdk_version: 3 + github: python/example_code/s3/scenarios/batch + sdkguide: + excerpts: + - description: Learn S3 Batch Basics Scenario. + snippet_tags: + - python.example_code.s3control.Batch.scenario services: - s3-control: {CreateJob, DeleteJobTagging, DescribeJob, GetJobTagging, ListJobs, PutJobTagging, UpdateJobPriority, UpdateJobStatus} + s3-control: + { + CreateJob, + DeleteJobTagging, + DescribeJob, + GetJobTagging, + ListJobs, + PutJobTagging, + UpdateJobPriority, + UpdateJobStatus, + } diff --git a/python/example_code/s3/scenarios/batch/README.md b/python/example_code/s3/scenarios/batch/README.md new file mode 100644 index 00000000000..9500bcf0049 --- /dev/null +++ b/python/example_code/s3/scenarios/batch/README.md @@ -0,0 +1,56 @@ +# Amazon S3 Batch for the SDK for Python (boto3) + +## Overview + +This example demonstrates how to use the AWS SDK for Python (boto3) to work with Amazon Simple Storage Service (Amazon S3) Batch Scenario. The scenario covers various operations such as creating an AWS Batch compute environment, creating a job queue, creating a job defination, and submitting a job, and so on. + +Here are the top six service operations this scenario covers. + +1. **Create an AWS Batch computer environment**: Creates an AWS Batch computer environment. + +2. **Sets up a job queue**: Creates a job queue that will manage the submission of jobs. + +3. **Creates a job definition**: Creates a job definition that specifies how the jobs should be executed. + +4. **Registers a Job Definition**: Registers a job definition making it available for job submissions. + +5. **Submits a Batch Job**: Submits a job. + +6. **Checks the status of the job**: Checks the status of the job. + +## ⚠ Important + +- Running this code might result in charges to your AWS account. For more details, see [AWS Pricing](https://aws.amazon.com/pricing/) and [Free Tier](https://aws.amazon.com/free/). +- Running the tests might result in charges to your AWS account. +- We recommend that you grant your code least privilege. At most, grant only the minimum permissions required to perform the task. For more information, see [Grant least privilege](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html#grant-least-privilege). +- This code is not tested in every AWS Region. For more information, see [AWS Regional Services](https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services). + +## Code examples + +### Prerequisites + +To run these examples, you need: + +- Python 3.x installed. +- Run `python pip install -r requirements.txt` +- AWS credentials configured. For more information, see [Configuring the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html). + +#### Running the workflow + +To run this workflow, pull AWS tokens and run the command below: + +```bash +python s3_batch.py +``` + +## Additional resources + +- [Amazon S3 Developer Guide](https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lock.html) +- [Amazon S3 API Reference](https://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html) +- [boto3 Amazon S3 reference](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html) + +--- + +© Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 diff --git a/python/example_code/s3/scenarios/batch/cloudformation_helper.py b/python/example_code/s3/scenarios/batch/cloudformation_helper.py new file mode 100644 index 00000000000..7c1221c70aa --- /dev/null +++ b/python/example_code/s3/scenarios/batch/cloudformation_helper.py @@ -0,0 +1,170 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Helper class for managing CloudFormation stack operations for S3 Batch Operations. +""" + +import json +from typing import Dict, Any + +import boto3 +from botocore.exceptions import ClientError, WaiterError + +# snippet-start:[python.example_code.s3control.CloudFormationHelper] +class CloudFormationHelper: + """Helper class for managing CloudFormation stack operations.""" + + def __init__(self, cfn_client: Any) -> None: + """ + Initializes the CloudFormationHelper with a CloudFormation client. + + :param cfn_client: A Boto3 Amazon CloudFormation client. This client provides + low-level access to AWS CloudFormation services. + """ + self.cfn_client = cfn_client + + def deploy_cloudformation_stack(self, stack_name: str) -> None: + """ + Deploy a CloudFormation stack with S3 batch operation permissions. + + Args: + stack_name (str): Name of the CloudFormation stack + + Raises: + ClientError: If stack creation fails + """ + try: + template = { + "AWSTemplateFormatVersion": "2010-09-09", + "Resources": { + "S3BatchRole": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "batchoperations.s3.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] + }, + "ManagedPolicyArns": [ + "arn:aws:iam::aws:policy/AmazonS3FullAccess" + ], + "Policies": [ + { + "PolicyName": "S3BatchOperationsPolicy", + "PolicyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "s3:*", + "s3-object-lambda:*" + ], + "Resource": "*" + } + ] + } + } + ] + } + } + }, + "Outputs": { + "S3BatchRoleArn": { + "Description": "ARN of IAM Role for S3 Batch Operations", + "Value": {"Fn::GetAtt": ["S3BatchRole", "Arn"]} + } + } + } + + self.cfn_client.create_stack( + StackName=stack_name, + TemplateBody=json.dumps(template), + Capabilities=['CAPABILITY_IAM'] + ) + + print(f"Creating stack {stack_name}...") + self._wait_for_stack_completion(stack_name, 'CREATE') + print(f"Stack {stack_name} created successfully") + + except ClientError as e: + print(f"Error creating CloudFormation stack: {e}") + raise + + def get_stack_outputs(self, stack_name: str) -> Dict[str, str]: + """ + Get CloudFormation stack outputs. + + Args: + stack_name (str): Name of the CloudFormation stack + + Returns: + dict: Stack outputs + + Raises: + ClientError: If getting stack outputs fails + """ + try: + response = self.cfn_client.describe_stacks(StackName=stack_name) + outputs = {} + if 'Stacks' in response and response['Stacks']: + for output in response['Stacks'][0].get('Outputs', []): + outputs[output['OutputKey']] = output['OutputValue'] + return outputs + + except ClientError as e: + print(f"Error getting stack outputs: {e}") + raise + + def destroy_cloudformation_stack(self, stack_name: str) -> None: + """ + Delete a CloudFormation stack. + + Args: + stack_name (str): Name of the CloudFormation stack + + Raises: + ClientError: If stack deletion fails + """ + try: + self.cfn_client.delete_stack(StackName=stack_name) + print(f"Deleting stack {stack_name}...") + self._wait_for_stack_completion(stack_name, 'DELETE') + print(f"Stack {stack_name} deleted successfully") + + except ClientError as e: + print(f"Error deleting CloudFormation stack: {e}") + raise + + def _wait_for_stack_completion(self, stack_name: str, operation: str) -> None: + """ + Wait for CloudFormation stack operation to complete. + + Args: + stack_name (str): Name of the CloudFormation stack + operation (str): Stack operation (CREATE or DELETE) + + Raises: + WaiterError: If waiting for stack completion fails + """ + try: + waiter = self.cfn_client.get_waiter( + 'stack_create_complete' if operation == 'CREATE' + else 'stack_delete_complete' + ) + waiter.wait( + StackName=stack_name, + WaiterConfig={'Delay': 5, 'MaxAttempts': 60} + ) + except WaiterError as e: + print(f"Error waiting for stack {operation}: {e}") + raise +# snippet-end:[python.example_code.s3control.CloudFormationHelper] \ No newline at end of file diff --git a/python/example_code/s3/scenarios/batch/requirements.txt b/python/example_code/s3/scenarios/batch/requirements.txt new file mode 100644 index 00000000000..2c9802951bf --- /dev/null +++ b/python/example_code/s3/scenarios/batch/requirements.txt @@ -0,0 +1,2 @@ +boto3>=1.26.0 +botocore>=1.29.0 \ No newline at end of file diff --git a/python/example_code/s3/scenarios/batch/s3_batch_scenario.py b/python/example_code/s3/scenarios/batch/s3_batch_scenario.py new file mode 100644 index 00000000000..70af2554428 --- /dev/null +++ b/python/example_code/s3/scenarios/batch/s3_batch_scenario.py @@ -0,0 +1,205 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +S3 Batch Operations Scenario + +This scenario demonstrates how to use AWS S3 Batch Operations to perform large-scale +operations on S3 objects. The scenario includes the following steps: + +1. Create S3 Batch Job - Creates a batch job to tag objects +2. Update Job Priority - Modifies the job priority and activates the job +3. Cancel Job - Optionally cancels the batch job +4. Describe Job Details - Shows detailed information about the job +5. Get Job Tags - Retrieves tags associated with the job +6. Put Job Tags - Adds additional tags to the job +7. List Jobs - Lists all batch jobs for the account +8. Delete Job Tags - Removes tags from the job + +The scenario uses CloudFormation to create necessary IAM roles and demonstrates +proper resource cleanup at the end. +""" + +import time +import uuid +import sys +from typing import Tuple + +import boto3 +from cloudformation_helper import CloudFormationHelper +from s3_batch_wrapper import S3BatchWrapper +sys.path.append("../../../..") +import demo_tools.question as q + +# snippet-start:[python.example_code.s3control.helper.S3BatchScenario] +class S3BatchScenario: + """Manages the S3 Batch Operations scenario.""" + + DASHES = "-" * 80 + STACK_NAME = "MyS3Stack" + + def __init__(self, s3_batch_wrapper: S3BatchWrapper, cfn_helper: CloudFormationHelper) -> None: + """ + Initialize the S3 Batch scenario. + + Args: + s3_batch_wrapper: S3BatchWrapper instance + cfn_helper: CloudFormationHelper instance + """ + self.s3_batch_wrapper = s3_batch_wrapper + self.cfn_helper = cfn_helper + + def wait_for_input(self) -> None: + """Wait for user input to continue.""" + q.ask("\nPress Enter to continue...") + print() + + def setup_resources(self, bucket_name: str, file_names: list) -> Tuple[str, str]: + """ + Set up initial resources for the scenario. + + Args: + bucket_name (str): Name of the bucket to create + file_names (list): List of files to upload + + Returns: + tuple: Manifest location and report bucket ARN + """ + print("\nSetting up required resources...") + self.s3_batch_wrapper.create_bucket(bucket_name) + report_bucket_arn = f"arn:aws:s3:::{bucket_name}" + manifest_location = f"arn:aws:s3:::{bucket_name}/job-manifest.csv" + self.s3_batch_wrapper.upload_files_to_bucket(bucket_name, file_names) + return manifest_location, report_bucket_arn + + def run_scenario(self) -> None: + """Run the S3 Batch Operations scenario.""" + account_id = self.s3_batch_wrapper.get_account_id() + bucket_name = f"demo-s3-batch-{str(uuid.uuid4())}" + file_names = [ + "job-manifest.csv", + "object-key-1.txt", + "object-key-2.txt", + "object-key-3.txt", + "object-key-4.txt" + ] + + print(self.DASHES) + print("Welcome to the Amazon S3 Batch basics scenario.") + print(""" + S3 Batch operations enables efficient and cost-effective processing of large-scale + data stored in Amazon S3. It automatically scales resources to handle varying workloads + without the need for manual intervention. + + This Python program walks you through Amazon S3 Batch operations. + """) + + try: + # Deploy CloudFormation stack for IAM roles + print("Deploying CloudFormation stack...") + self.cfn_helper.deploy_cloudformation_stack(self.STACK_NAME) + stack_outputs = self.cfn_helper.get_stack_outputs(self.STACK_NAME) + iam_role_arn = stack_outputs.get('S3BatchRoleArn') + + # Set up S3 bucket and upload test files + manifest_location, report_bucket_arn = self.setup_resources( + bucket_name, file_names + ) + + self.wait_for_input() + + print("\n1. Creating S3 Batch Job...") + job_id = self.s3_batch_wrapper.create_s3_batch_job( + account_id, + iam_role_arn, + manifest_location, + report_bucket_arn + ) + + time.sleep(5) + failure_reasons = self.s3_batch_wrapper.check_job_failure_reasons(job_id, account_id) + if failure_reasons: + print("\nJob failed. Please fix the issues and try again.") + if not q.ask( + "Do you want to proceed with the rest of the operations? (y/n): ", q.is_yesno + ): + raise ValueError("Job failed, stopping execution") + + self.wait_for_input() + print("\n" + self.DASHES) + print("2. Update an existing S3 Batch Operations job's priority") + print("In this step, we modify the job priority value. The higher the number, the higher the priority.") + self.s3_batch_wrapper.update_job_priority(job_id, account_id) + + self.wait_for_input() + print("\n" + self.DASHES) + print("3. Cancel the S3 Batch job") + cancel_job = q.ask("Do you want to cancel the Batch job? (y/n): ", q.is_yesno) + if cancel_job: + self.s3_batch_wrapper.cancel_job(job_id, account_id) + else: + print(f"Job {job_id} was not canceled.") + + self.wait_for_input() + print("\n" + self.DASHES) + print("4. Describe the job that was just created") + self.s3_batch_wrapper.describe_job_details(job_id, account_id) + + self.wait_for_input() + print("\n" + self.DASHES) + print("5. Describe the tags associated with the job") + self.s3_batch_wrapper.get_job_tags(job_id, account_id) + + self.wait_for_input() + print("\n" + self.DASHES) + print("6. Update Batch Job Tags") + self.s3_batch_wrapper.put_job_tags(job_id, account_id) + + self.wait_for_input() + print("\n" + self.DASHES) + print("7. List Batch Jobs") + self.s3_batch_wrapper.list_jobs(account_id) + + self.wait_for_input() + print("\n" + self.DASHES) + print("8. Delete the Amazon S3 Batch job tagging") + delete_tags = q.ask("Do you want to delete Batch job tagging? (y/n): ", q.is_yesno) + if delete_tags: + self.s3_batch_wrapper.delete_job_tags(job_id, account_id) + + print("\n" + self.DASHES) + if q.ask( + "Do you want to delete the AWS resources used in this scenario? (y/n): ", q.is_yesno + ): + self.s3_batch_wrapper.cleanup_resources(bucket_name, file_names) + self.cfn_helper.destroy_cloudformation_stack(self.STACK_NAME) + + except Exception as e: + print(f"An error occurred: {e}") + raise + + print("\nThe Amazon S3 Batch scenario has successfully completed.") + print(self.DASHES) +# snippet-end:[python.example_code.s3control.helper.S3BatchScenario] + +def main() -> None: + """ + Main function to run the S3 Batch Operations scenario. + + This example uses the default settings specified in your shared credentials + and config files. + """ + s3_client = boto3.client('s3') + s3control_client = boto3.client('s3control') + sts_client = boto3.client('sts') + cfn_client = boto3.client('cloudformation') + + s3_batch_wrapper = S3BatchWrapper(s3_client, s3control_client, sts_client) + cfn_helper = CloudFormationHelper(cfn_client) + + scenario = S3BatchScenario(s3_batch_wrapper, cfn_helper) + scenario.run_scenario() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/python/example_code/s3/scenarios/batch/s3_batch_wrapper.py b/python/example_code/s3/scenarios/batch/s3_batch_wrapper.py new file mode 100644 index 00000000000..686f62daca8 --- /dev/null +++ b/python/example_code/s3/scenarios/batch/s3_batch_wrapper.py @@ -0,0 +1,483 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Wrapper class for AWS S3 Batch Operations. +""" + +import time +from typing import Dict, List, Any + +import boto3 +from botocore.exceptions import ClientError + +# snippet-start:[python.example_code.s3control.helper.S3BatchScenario] +class S3BatchWrapper: + """Wrapper class for managing S3 Batch Operations.""" + + def __init__(self, s3_client: Any, s3control_client: Any, sts_client: Any) -> None: + """ + Initializes the S3BatchWrapper with AWS service clients. + + :param s3_client: A Boto3 Amazon S3 client. This client provides low-level + access to AWS S3 services. + :param s3control_client: A Boto3 Amazon S3 Control client. This client provides + low-level access to AWS S3 Control services. + :param sts_client: A Boto3 AWS STS client. This client provides low-level + access to AWS STS services. + """ + self.s3_client = s3_client + self.s3control_client = s3control_client + self.sts_client = sts_client + # Get region from the client for bucket creation logic + self.region_name = self.s3_client.meta.region_name + + def get_account_id(self) -> str: + """ + Get AWS account ID. + + Returns: + str: AWS account ID + """ + return self.sts_client.get_caller_identity()["Account"] + + def create_bucket(self, bucket_name: str) -> None: + """ + Create an S3 bucket. + + Args: + bucket_name (str): Name of the bucket to create + + Raises: + ClientError: If bucket creation fails + """ + try: + if self.region_name and self.region_name != 'us-east-1': + self.s3_client.create_bucket( + Bucket=bucket_name, + CreateBucketConfiguration={ + 'LocationConstraint': self.region_name + } + ) + else: + self.s3_client.create_bucket(Bucket=bucket_name) + print(f"Created bucket: {bucket_name}") + except ClientError as e: + print(f"Error creating bucket: {e}") + raise + + def upload_files_to_bucket(self, bucket_name: str, file_names: List[str]) -> str: + """ + Upload files to S3 bucket including manifest file. + + Args: + bucket_name (str): Target bucket name + file_names (list): List of file names to upload + + Returns: + str: ETag of the manifest file + + Raises: + ClientError: If file upload fails + """ + try: + for file_name in file_names: + if file_name != "job-manifest.csv": + content = f"Content for {file_name}" + self.s3_client.put_object( + Bucket=bucket_name, + Key=file_name, + Body=content.encode('utf-8') + ) + print(f"Uploaded {file_name} to {bucket_name}") + + manifest_content = "" + for file_name in file_names: + if file_name != "job-manifest.csv": + manifest_content += f"{bucket_name},{file_name}\n" + + manifest_response = self.s3_client.put_object( + Bucket=bucket_name, + Key="job-manifest.csv", + Body=manifest_content.encode('utf-8') + ) + print(f"Uploaded manifest file to {bucket_name}") + print(f"Manifest content:\n{manifest_content}") + return manifest_response['ETag'].strip('"') + + except ClientError as e: + print(f"Error uploading files: {e}") + raise + + # snippet-start:[python.example_code.s3control.create_job] + def create_s3_batch_job(self, account_id: str, role_arn: str, manifest_location: str, + report_bucket_name: str) -> str: + """ + Create an S3 batch operation job. + + Args: + account_id (str): AWS account ID + role_arn (str): IAM role ARN for batch operations + manifest_location (str): Location of the manifest file + report_bucket_name (str): Bucket for job reports + + Returns: + str: Job ID + + Raises: + ClientError: If job creation fails + """ + try: + bucket_name = manifest_location.split(':::')[1].split('/')[0] + manifest_key = 'job-manifest.csv' + manifest_obj = self.s3_client.head_object( + Bucket=bucket_name, + Key=manifest_key + ) + etag = manifest_obj['ETag'].strip('"') + + response = self.s3control_client.create_job( + AccountId=account_id, + Operation={ + 'S3PutObjectTagging': { + 'TagSet': [ + { + 'Key': 'BatchTag', + 'Value': 'BatchValue' + }, + ] + } + }, + Report={ + 'Bucket': report_bucket_name, + 'Format': 'Report_CSV_20180820', + 'Enabled': True, + 'Prefix': 'batch-op-reports', + 'ReportScope': 'AllTasks' + }, + Manifest={ + 'Spec': { + 'Format': 'S3BatchOperations_CSV_20180820', + 'Fields': ['Bucket', 'Key'] + }, + 'Location': { + 'ObjectArn': manifest_location, + 'ETag': etag + } + }, + Priority=10, + RoleArn=role_arn, + Description='Batch job for tagging objects', + ConfirmationRequired=True + ) + job_id = response['JobId'] + print(f"The Job id is {job_id}") + return job_id + except ClientError as e: + print(f"Error creating batch job: {e}") + if 'Message' in str(e): + print(f"Detailed error message: {e.response['Message']}") + raise + # snippet-end:[python.example_code.s3control.create_job] + + def check_job_failure_reasons(self, job_id: str, account_id: str) -> List[Dict[str, Any]]: + """ + Check for any failure reasons of a batch job. + + Args: + job_id (str): ID of the batch job + account_id (str): AWS account ID + + Returns: + list: List of failure reasons + + Raises: + ClientError: If checking job failure reasons fails + """ + try: + response = self.s3control_client.describe_job( + AccountId=account_id, + JobId=job_id + ) + if 'FailureReasons' in response['Job']: + for reason in response['Job']['FailureReasons']: + print(f"- {reason}") + return response['Job'].get('FailureReasons', []) + except ClientError as e: + print(f"Error checking job failure reasons: {e}") + raise + + def wait_for_job_ready(self, job_id: str, account_id: str, desired_status: str = 'Ready') -> bool: + """ + Wait for a job to reach the desired status. + + Args: + job_id (str): ID of the batch job + account_id (str): AWS account ID + desired_status (str): Target status to wait for + + Returns: + bool: True if desired status is reached, False otherwise + + Raises: + ClientError: If checking job status fails + """ + print(f"Waiting for job to become {desired_status}...") + max_attempts = 60 + attempt = 0 + while attempt < max_attempts: + try: + response = self.s3control_client.describe_job( + AccountId=account_id, + JobId=job_id + ) + current_status = response['Job']['Status'] + print(f"Current job status: {current_status}") + if current_status == desired_status: + return True + if current_status == 'Suspended': + print("Job is in Suspended state, can proceed with activation") + return True + if current_status in ['Active', 'Failed', 'Cancelled', 'Complete']: + print(f"Job is in {current_status} state, cannot reach {desired_status} status") + if 'FailureReasons' in response['Job']: + print("Failure reasons:") + for reason in response['Job']['FailureReasons']: + print(f"- {reason}") + return False + + time.sleep(20) + attempt += 1 + except ClientError as e: + print(f"Error checking job status: {e}") + raise + print(f"Timeout waiting for job to become {desired_status}") + return False + + # snippet-start:[python.example_code.s3control.update_job_priority] + def update_job_priority(self, job_id: str, account_id: str) -> None: + """ + Update the priority of a batch job and start it. + + Args: + job_id (str): ID of the batch job + account_id (str): AWS account ID + """ + try: + response = self.s3control_client.describe_job( + AccountId=account_id, + JobId=job_id + ) + current_status = response['Job']['Status'] + print(f"Current job status: {current_status}") + + if current_status in ['Ready', 'Suspended']: + self.s3control_client.update_job_priority( + AccountId=account_id, + JobId=job_id, + Priority=60 + ) + print("The job priority was updated") + + try: + self.s3control_client.update_job_status( + AccountId=account_id, + JobId=job_id, + RequestedJobStatus='Ready' + ) + print("Job activated successfully") + except ClientError as activation_error: + print(f"Note: Could not activate job automatically: {activation_error}") + print("Job priority was updated successfully. Job may need manual activation in the console.") + elif current_status in ['Active', 'Completing', 'Complete']: + print(f"Job is in '{current_status}' state - priority cannot be updated") + if current_status == 'Completing': + print("Job is finishing up and will complete soon.") + elif current_status == 'Complete': + print("Job has already completed successfully.") + else: + print("Job is currently running.") + else: + print(f"Job is in '{current_status}' state - priority update not allowed") + + except ClientError as e: + print(f"Error updating job priority: {e}") + print("Continuing with the scenario...") + return + # snippet-end:[python.example_code.s3control.update_job_priority] + + def cancel_job(self, job_id: str, account_id: str) -> None: + """ + Cancel an S3 batch job. + + Args: + job_id (str): ID of the batch job + account_id (str): AWS account ID + """ + try: + self.s3control_client.update_job_status( + AccountId=account_id, + JobId=job_id, + RequestedJobStatus='Cancelled' + ) + print(f"Job {job_id} was successfully canceled.") + except ClientError as e: + print(f"Error canceling job: {e}") + raise + + # snippet-start:[python.example_code.s3control.describe_job] + def describe_job_details(self, job_id: str, account_id: str) -> None: + """ + Describe detailed information about a batch job. + + Args: + job_id (str): ID of the batch job + account_id (str): AWS account ID + """ + try: + response = self.s3control_client.describe_job( + AccountId=account_id, + JobId=job_id + ) + job = response['Job'] + print(f"Job ID: {job['JobId']}") + print(f"Description: {job.get('Description', 'N/A')}") + print(f"Status: {job['Status']}") + print(f"Role ARN: {job['RoleArn']}") + print(f"Priority: {job['Priority']}") + if 'ProgressSummary' in job: + progress = job['ProgressSummary'] + print(f"Progress Summary: Total={progress.get('TotalNumberOfTasks', 0)}, " + f"Succeeded={progress.get('NumberOfTasksSucceeded', 0)}, " + f"Failed={progress.get('NumberOfTasksFailed', 0)}") + except ClientError as e: + print(f"Error describing job: {e}") + raise + # snippet-end:[python.example_code.s3control.describe_job] + + # snippet-start:[python.example_code.s3control.get_job_tagging] + def get_job_tags(self, job_id: str, account_id: str) -> None: + """ + Get tags associated with a batch job. + + Args: + job_id (str): ID of the batch job + account_id (str): AWS account ID + """ + try: + response = self.s3control_client.get_job_tagging( + AccountId=account_id, + JobId=job_id + ) + tags = response.get('Tags', []) + if tags: + print(f"Tags for job {job_id}:") + for tag in tags: + print(f" {tag['Key']}: {tag['Value']}") + else: + print(f"No tags found for job ID: {job_id}") + except ClientError as e: + print(f"Error getting job tags: {e}") + raise + # snippet-end:[python.example_code.s3control.get_job_tagging] + + # snippet-start:[python.example_code.s3control.put_job_tagging] + def put_job_tags(self, job_id: str, account_id: str) -> None: + """ + Add tags to a batch job. + + Args: + job_id (str): ID of the batch job + account_id (str): AWS account ID + """ + try: + self.s3control_client.put_job_tagging( + AccountId=account_id, + JobId=job_id, + Tags=[ + {'Key': 'Environment', 'Value': 'Development'}, + {'Key': 'Team', 'Value': 'DataProcessing'} + ] + ) + print(f"Additional tags were added to job {job_id}") + except ClientError as e: + print(f"Error adding job tags: {e}") + raise + # snippet-end:[python.example_code.s3control.put_job_tagging] + + # snippet-start:[python.example_code.s3control.list_jobs] + def list_jobs(self, account_id: str) -> None: + """ + List all batch jobs for the account. + + Args: + account_id (str): AWS account ID + """ + try: + response = self.s3control_client.list_jobs( + AccountId=account_id, + JobStatuses=['Active', 'Complete', 'Cancelled', 'Failed', 'New', 'Paused', 'Pausing', 'Preparing', 'Ready', 'Suspended'] + ) + jobs = response.get('Jobs', []) + for job in jobs: + print(f"The job id is {job['JobId']}") + print(f"The job priority is {job['Priority']}") + except ClientError as e: + print(f"Error listing jobs: {e}") + raise + # snippet-end:[python.example_code.s3control.list_jobs] + + # snippet-start:[python.example_code.s3control.delete_job_tagging] + def delete_job_tags(self, job_id: str, account_id: str) -> None: + """ + Delete all tags from a batch job. + + Args: + job_id (str): ID of the batch job + account_id (str): AWS account ID + """ + try: + self.s3control_client.delete_job_tagging( + AccountId=account_id, + JobId=job_id + ) + print(f"You have successfully deleted {job_id} tagging.") + except ClientError as e: + print(f"Error deleting job tags: {e}") + raise + # snippet-end:[python.example_code.s3control.delete_job_tagging] + + def cleanup_resources(self, bucket_name: str, file_names: List[str]) -> None: + """ + Clean up all resources created during the scenario. + + Args: + bucket_name (str): Name of the bucket to clean up + file_names (list): List of files to delete + + Raises: + ClientError: If cleanup fails + """ + try: + for file_name in file_names: + self.s3_client.delete_object(Bucket=bucket_name, Key=file_name) + print(f"Deleted {file_name}") + + response = self.s3_client.list_objects_v2( + Bucket=bucket_name, + Prefix='batch-op-reports/' + ) + if 'Contents' in response: + for obj in response['Contents']: + self.s3_client.delete_object( + Bucket=bucket_name, + Key=obj['Key'] + ) + print(f"Deleted {obj['Key']}") + + self.s3_client.delete_bucket(Bucket=bucket_name) + print(f"Deleted bucket {bucket_name}") + except ClientError as e: + print(f"Error in cleanup: {e}") + raise +# snippet-end:[python.example_code.s3control.helper.S3BatchScenario] \ No newline at end of file diff --git a/python/example_code/s3/scenarios/batch/test/conftest.py b/python/example_code/s3/scenarios/batch/test/conftest.py new file mode 100644 index 00000000000..774e8801dc7 --- /dev/null +++ b/python/example_code/s3/scenarios/batch/test/conftest.py @@ -0,0 +1,36 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Shared test fixtures for S3 batch operations tests.""" + +import boto3 +import pytest +from moto import mock_s3, mock_s3control, mock_sts + +from test_s3_batch_stubber import S3BatchStubber +from s3_batch_wrapper import S3BatchWrapper +from cloudformation_helper import CloudFormationHelper + + +class ScenarioData: + """Holds data for scenario tests.""" + + def __init__(self, wrapper, cfn_helper, stubber): + self.wrapper = wrapper + self.cfn_helper = cfn_helper + self.stubber = stubber + + +@pytest.fixture +def scenario_data(make_stubber): + """Create scenario data with stubbed clients.""" + s3_client = boto3.client("s3", region_name="us-east-1") + s3control_client = boto3.client("s3control", region_name="us-east-1") + sts_client = boto3.client("sts", region_name="us-east-1") + cfn_client = boto3.client("cloudformation", region_name="us-east-1") + + wrapper = S3BatchWrapper(s3_client, s3control_client, sts_client) + cfn_helper = CloudFormationHelper(cfn_client) + stubber = make_stubber(S3BatchStubber, s3_client, s3control_client, sts_client) + + return ScenarioData(wrapper, cfn_helper, stubber) \ No newline at end of file diff --git a/python/example_code/s3/scenarios/batch/test/test_requirements.txt b/python/example_code/s3/scenarios/batch/test/test_requirements.txt new file mode 100644 index 00000000000..5df1a8f4ba2 --- /dev/null +++ b/python/example_code/s3/scenarios/batch/test/test_requirements.txt @@ -0,0 +1,4 @@ +pytest>=7.0.0 +pytest-mock>=3.10.0 +boto3>=1.26.0 +botocore>=1.29.0 \ No newline at end of file diff --git a/python/example_code/s3/scenarios/batch/test/test_s3_batch_stubbed.py b/python/example_code/s3/scenarios/batch/test/test_s3_batch_stubbed.py new file mode 100644 index 00000000000..46b2b005a2a --- /dev/null +++ b/python/example_code/s3/scenarios/batch/test/test_s3_batch_stubbed.py @@ -0,0 +1,218 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for S3 batch operations using service method stubbing patterns.""" + +from botocore.exceptions import ClientError +import pytest + + +class MockManager: + def __init__(self, stub_runner, scenario_data, input_mocker): + self.scenario_data = scenario_data + self.stub_runner = stub_runner + self.account_id = "123456789012" + self.bucket_name = "test-batch-bucket" + self.job_id = "test-job-123" + self.role_arn = "arn:aws:iam::123456789012:role/S3BatchRole" + self.manifest_location = f"arn:aws:s3:::{self.bucket_name}/job-manifest.csv" + self.etag = "test-etag-123" + self.file_names = ["job-manifest.csv", "object-key-1.txt", "object-key-2.txt"] + + # Mock user inputs + answers = ["y", "n", "y"] # yes to proceed, no to cancel, yes to cleanup + input_mocker.mock_answers(answers) + + def setup_stubs(self, error, stop_on, stubber): + with self.stub_runner(error, stop_on) as runner: + runner.add(stubber.stub_get_caller_identity, self.account_id) + runner.add(stubber.stub_create_bucket, self.bucket_name) + runner.add(stubber.stub_put_object, self.bucket_name, "object-key-1.txt") + runner.add(stubber.stub_put_object, self.bucket_name, "object-key-2.txt") + runner.add(stubber.stub_put_object, self.bucket_name, "job-manifest.csv", etag=self.etag) + runner.add(stubber.stub_head_object, self.bucket_name, "job-manifest.csv", etag=self.etag) + runner.add(stubber.stub_create_job, self.account_id, self.job_id) + runner.add(stubber.stub_describe_job, self.account_id, self.job_id, status="Suspended") + runner.add(stubber.stub_describe_job, self.account_id, self.job_id, status="Suspended") + runner.add(stubber.stub_update_job_priority, self.account_id, self.job_id) + runner.add(stubber.stub_update_job_status, self.account_id, self.job_id, "Ready") + runner.add(stubber.stub_describe_job, self.account_id, self.job_id, status="Ready") + runner.add(stubber.stub_get_job_tagging, self.account_id, self.job_id, tags=[]) + runner.add(stubber.stub_put_job_tagging, self.account_id, self.job_id) + runner.add(stubber.stub_list_jobs, self.account_id, [{"JobId": self.job_id, "Priority": 60}]) + runner.add(stubber.stub_delete_job_tagging, self.account_id, self.job_id) + + def setup_cleanup_stubs(self, stubber): + with self.stub_runner(None, None) as runner: + for file_name in self.file_names: + runner.add(stubber.stub_delete_object, self.bucket_name, file_name) + runner.add(stubber.stub_list_objects_v2, self.bucket_name, prefix="batch-op-reports/", contents=[]) + runner.add(stubber.stub_delete_bucket, self.bucket_name) + + +@pytest.fixture +def mock_mgr(stub_runner, scenario_data, input_mocker): + return MockManager(stub_runner, scenario_data, input_mocker) + + +def test_get_account_id(mock_mgr, capsys): + mock_mgr.setup_stubs(None, 0, mock_mgr.scenario_data.stubber) + + account_id = mock_mgr.scenario_data.wrapper.get_account_id() + + assert account_id == mock_mgr.account_id + + +def test_create_bucket(mock_mgr, capsys): + mock_mgr.setup_stubs(None, 1, mock_mgr.scenario_data.stubber) + + mock_mgr.scenario_data.wrapper.create_bucket(mock_mgr.bucket_name) + + capt = capsys.readouterr() + assert f"Created bucket: {mock_mgr.bucket_name}" in capt.out + + +def test_upload_files_to_bucket(mock_mgr, capsys): + mock_mgr.setup_stubs(None, 4, mock_mgr.scenario_data.stubber) + + etag = mock_mgr.scenario_data.wrapper.upload_files_to_bucket( + mock_mgr.bucket_name, mock_mgr.file_names + ) + + assert etag == mock_mgr.etag + capt = capsys.readouterr() + assert "Uploaded manifest file" in capt.out + + +def test_create_s3_batch_job(mock_mgr, capsys): + mock_mgr.setup_stubs(None, 6, mock_mgr.scenario_data.stubber) + + job_id = mock_mgr.scenario_data.wrapper.create_s3_batch_job( + mock_mgr.account_id, + mock_mgr.role_arn, + mock_mgr.manifest_location, + f"arn:aws:s3:::{mock_mgr.bucket_name}" + ) + + assert job_id == mock_mgr.job_id + capt = capsys.readouterr() + assert f"The Job id is {mock_mgr.job_id}" in capt.out + + +def test_check_job_failure_reasons(mock_mgr): + mock_mgr.setup_stubs(None, 7, mock_mgr.scenario_data.stubber) + + reasons = mock_mgr.scenario_data.wrapper.check_job_failure_reasons( + mock_mgr.job_id, mock_mgr.account_id + ) + + assert reasons == [] + + +def test_update_job_priority(mock_mgr, capsys): + mock_mgr.setup_stubs(None, 10, mock_mgr.scenario_data.stubber) + + mock_mgr.scenario_data.wrapper.update_job_priority( + mock_mgr.job_id, mock_mgr.account_id + ) + + capt = capsys.readouterr() + assert "The job priority was updated" in capt.out + + +def test_describe_job_details(mock_mgr, capsys): + mock_mgr.setup_stubs(None, 11, mock_mgr.scenario_data.stubber) + + mock_mgr.scenario_data.wrapper.describe_job_details( + mock_mgr.job_id, mock_mgr.account_id + ) + + capt = capsys.readouterr() + assert f"Job ID: {mock_mgr.job_id}" in capt.out + + +def test_get_job_tags(mock_mgr, capsys): + mock_mgr.setup_stubs(None, 12, mock_mgr.scenario_data.stubber) + + mock_mgr.scenario_data.wrapper.get_job_tags( + mock_mgr.job_id, mock_mgr.account_id + ) + + capt = capsys.readouterr() + assert f"No tags found for job ID: {mock_mgr.job_id}" in capt.out + + +def test_put_job_tags(mock_mgr, capsys): + mock_mgr.setup_stubs(None, 13, mock_mgr.scenario_data.stubber) + + mock_mgr.scenario_data.wrapper.put_job_tags( + mock_mgr.job_id, mock_mgr.account_id + ) + + capt = capsys.readouterr() + assert f"Additional tags were added to job {mock_mgr.job_id}" in capt.out + + +def test_list_jobs(mock_mgr, capsys): + mock_mgr.setup_stubs(None, 14, mock_mgr.scenario_data.stubber) + + mock_mgr.scenario_data.wrapper.list_jobs(mock_mgr.account_id) + + capt = capsys.readouterr() + assert f"The job id is {mock_mgr.job_id}" in capt.out + assert "The job priority is 60" in capt.out + + +def test_delete_job_tags(mock_mgr, capsys): + mock_mgr.setup_stubs(None, 15, mock_mgr.scenario_data.stubber) + + mock_mgr.scenario_data.wrapper.delete_job_tags( + mock_mgr.job_id, mock_mgr.account_id + ) + + capt = capsys.readouterr() + assert f"You have successfully deleted {mock_mgr.job_id} tagging." in capt.out + + +def test_cleanup_resources(mock_mgr, capsys): + mock_mgr.setup_cleanup_stubs(mock_mgr.scenario_data.stubber) + + mock_mgr.scenario_data.wrapper.cleanup_resources( + mock_mgr.bucket_name, mock_mgr.file_names + ) + + capt = capsys.readouterr() + assert f"Deleted bucket {mock_mgr.bucket_name}" in capt.out + + +@pytest.mark.parametrize( + "error, stop_on_index", + [ + ("TESTERROR-stub_get_caller_identity", 0), + ("TESTERROR-stub_create_bucket", 1), + ("TESTERROR-stub_create_job", 6), + ("TESTERROR-stub_update_job_priority", 9), + ], +) +def test_wrapper_errors(mock_mgr, caplog, error, stop_on_index): + mock_mgr.setup_stubs(error, stop_on_index, mock_mgr.scenario_data.stubber) + + with pytest.raises(ClientError) as exc_info: + if "get_caller_identity" in error: + mock_mgr.scenario_data.wrapper.get_account_id() + elif "create_bucket" in error: + mock_mgr.scenario_data.wrapper.create_bucket(mock_mgr.bucket_name) + elif "create_job" in error: + mock_mgr.scenario_data.wrapper.create_s3_batch_job( + mock_mgr.account_id, + mock_mgr.role_arn, + mock_mgr.manifest_location, + f"arn:aws:s3:::{mock_mgr.bucket_name}" + ) + elif "update_job_priority" in error: + mock_mgr.scenario_data.wrapper.update_job_priority( + mock_mgr.job_id, mock_mgr.account_id + ) + + assert exc_info.value.response["Error"]["Code"] == error + assert error in caplog.text \ No newline at end of file diff --git a/python/example_code/s3/scenarios/batch/test/test_s3_batch_stubber.py b/python/example_code/s3/scenarios/batch/test/test_s3_batch_stubber.py new file mode 100644 index 00000000000..dc334666ce0 --- /dev/null +++ b/python/example_code/s3/scenarios/batch/test/test_s3_batch_stubber.py @@ -0,0 +1,190 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Stubber functions for S3 batch operations tests.""" + +from botocore.stub import Stubber + + +class S3BatchStubber: + """Stubber for S3 Batch Operations service methods.""" + + def __init__(self, s3_client, s3control_client, sts_client): + """Initialize stubbers for all clients.""" + self.s3_stubber = Stubber(s3_client) + self.s3control_stubber = Stubber(s3control_client) + self.sts_stubber = Stubber(sts_client) + + def stub_get_caller_identity(self, account_id, error_code=None): + """Stub STS get_caller_identity method.""" + expected_params = {} + if error_code is None: + response = {"Account": account_id} + self.sts_stubber.add_response("get_caller_identity", response, expected_params) + else: + self.sts_stubber.add_client_error("get_caller_identity", error_code, expected_params=expected_params) + + def stub_create_bucket(self, bucket_name, region=None, error_code=None): + """Stub S3 create_bucket method.""" + expected_params = {"Bucket": bucket_name} + if region and region != "us-east-1": + expected_params["CreateBucketConfiguration"] = {"LocationConstraint": region} + + if error_code is None: + response = {} + self.s3_stubber.add_response("create_bucket", response, expected_params) + else: + self.s3_stubber.add_client_error("create_bucket", error_code, expected_params=expected_params) + + def stub_put_object(self, bucket_name, key, etag="test-etag", error_code=None): + """Stub S3 put_object method.""" + expected_params = {"Bucket": bucket_name, "Key": key, "Body": Stubber.ANY} + + if error_code is None: + response = {"ETag": f'"{etag}"'} + self.s3_stubber.add_response("put_object", response, expected_params) + else: + self.s3_stubber.add_client_error("put_object", error_code, expected_params=expected_params) + + def stub_head_object(self, bucket_name, key, etag="test-etag", error_code=None): + """Stub S3 head_object method.""" + expected_params = {"Bucket": bucket_name, "Key": key} + + if error_code is None: + response = {"ETag": f'"{etag}"'} + self.s3_stubber.add_response("head_object", response, expected_params) + else: + self.s3_stubber.add_client_error("head_object", error_code, expected_params=expected_params) + + def stub_create_job(self, account_id, job_id, error_code=None): + """Stub S3Control create_job method.""" + expected_params = { + "AccountId": account_id, + "Operation": Stubber.ANY, + "Report": Stubber.ANY, + "Manifest": Stubber.ANY, + "Priority": Stubber.ANY, + "RoleArn": Stubber.ANY, + "Description": Stubber.ANY, + "ConfirmationRequired": Stubber.ANY + } + + if error_code is None: + response = {"JobId": job_id} + self.s3control_stubber.add_response("create_job", response, expected_params) + else: + self.s3control_stubber.add_client_error("create_job", error_code, expected_params=expected_params) + + def stub_describe_job(self, account_id, job_id, status="Ready", failure_reasons=None, error_code=None): + """Stub S3Control describe_job method.""" + expected_params = {"AccountId": account_id, "JobId": job_id} + + if error_code is None: + job_data = { + "JobId": job_id, + "Status": status, + "Priority": 10, + "RoleArn": "arn:aws:iam::123456789012:role/S3BatchRole", + "Description": "Batch job for tagging objects" + } + if failure_reasons: + job_data["FailureReasons"] = failure_reasons + + response = {"Job": job_data} + self.s3control_stubber.add_response("describe_job", response, expected_params) + else: + self.s3control_stubber.add_client_error("describe_job", error_code, expected_params=expected_params) + + def stub_update_job_priority(self, account_id, job_id, priority=60, error_code=None): + """Stub S3Control update_job_priority method.""" + expected_params = {"AccountId": account_id, "JobId": job_id, "Priority": priority} + + if error_code is None: + response = {} + self.s3control_stubber.add_response("update_job_priority", response, expected_params) + else: + self.s3control_stubber.add_client_error("update_job_priority", error_code, expected_params=expected_params) + + def stub_update_job_status(self, account_id, job_id, status, error_code=None): + """Stub S3Control update_job_status method.""" + expected_params = {"AccountId": account_id, "JobId": job_id, "RequestedJobStatus": status} + + if error_code is None: + response = {} + self.s3control_stubber.add_response("update_job_status", response, expected_params) + else: + self.s3control_stubber.add_client_error("update_job_status", error_code, expected_params=expected_params) + + def stub_get_job_tagging(self, account_id, job_id, tags=None, error_code=None): + """Stub S3Control get_job_tagging method.""" + expected_params = {"AccountId": account_id, "JobId": job_id} + + if error_code is None: + response = {"Tags": tags or []} + self.s3control_stubber.add_response("get_job_tagging", response, expected_params) + else: + self.s3control_stubber.add_client_error("get_job_tagging", error_code, expected_params=expected_params) + + def stub_put_job_tagging(self, account_id, job_id, error_code=None): + """Stub S3Control put_job_tagging method.""" + expected_params = {"AccountId": account_id, "JobId": job_id, "Tags": Stubber.ANY} + + if error_code is None: + response = {} + self.s3control_stubber.add_response("put_job_tagging", response, expected_params) + else: + self.s3control_stubber.add_client_error("put_job_tagging", error_code, expected_params=expected_params) + + def stub_list_jobs(self, account_id, jobs=None, error_code=None): + """Stub S3Control list_jobs method.""" + expected_params = {"AccountId": account_id, "JobStatuses": Stubber.ANY} + + if error_code is None: + response = {"Jobs": jobs or []} + self.s3control_stubber.add_response("list_jobs", response, expected_params) + else: + self.s3control_stubber.add_client_error("list_jobs", error_code, expected_params=expected_params) + + def stub_delete_job_tagging(self, account_id, job_id, error_code=None): + """Stub S3Control delete_job_tagging method.""" + expected_params = {"AccountId": account_id, "JobId": job_id} + + if error_code is None: + response = {} + self.s3control_stubber.add_response("delete_job_tagging", response, expected_params) + else: + self.s3control_stubber.add_client_error("delete_job_tagging", error_code, expected_params=expected_params) + + def stub_delete_object(self, bucket_name, key, error_code=None): + """Stub S3 delete_object method.""" + expected_params = {"Bucket": bucket_name, "Key": key} + + if error_code is None: + response = {} + self.s3_stubber.add_response("delete_object", response, expected_params) + else: + self.s3_stubber.add_client_error("delete_object", error_code, expected_params=expected_params) + + def stub_list_objects_v2(self, bucket_name, prefix=None, contents=None, error_code=None): + """Stub S3 list_objects_v2 method.""" + expected_params = {"Bucket": bucket_name} + if prefix: + expected_params["Prefix"] = prefix + + if error_code is None: + response = {} + if contents: + response["Contents"] = contents + self.s3_stubber.add_response("list_objects_v2", response, expected_params) + else: + self.s3_stubber.add_client_error("list_objects_v2", error_code, expected_params=expected_params) + + def stub_delete_bucket(self, bucket_name, error_code=None): + """Stub S3 delete_bucket method.""" + expected_params = {"Bucket": bucket_name} + + if error_code is None: + response = {} + self.s3_stubber.add_response("delete_bucket", response, expected_params) + else: + self.s3_stubber.add_client_error("delete_bucket", error_code, expected_params=expected_params) \ No newline at end of file