Skip to content

WIP: pmr steup for tag+partition of alldressed with ft-weborganizer #93

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
57 changes: 43 additions & 14 deletions src/cookbook/cli/pmr.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,9 @@ def describe_instances(
statuses = statuses or InstanceStatus.active()

# Use provided client or create a new one with the specified region
client = client or boto3.client("ec2", region_name=region or cls.region)
if not client:
session = boto3.Session(profile_name=os.getenv("AWS_PROFILE", "default"))
client = session.client("ec2", region_name=region or cls.region)

filters = []

Expand Down Expand Up @@ -507,7 +509,10 @@ def describe_instance(
Returns:
InstanceInfo object containing the instance details
"""
client = client or boto3.client("ec2", region_name=region or cls.region)
# Use provided client or create a new one with the specified region
if not client:
session = boto3.Session(profile_name=os.getenv("AWS_PROFILE", "default"))
client = session.client("ec2", region_name=region or cls.region)
assert client, "EC2 client is required"

response = client.describe_instances(InstanceIds=[instance_id])
Expand All @@ -524,7 +529,10 @@ def pause(self, client: Union["EC2Client", None] = None, wait_for_completion: bo
Returns:
True if pause was successful, False otherwise
"""
client = client or boto3.client("ec2", region_name=self.region)
# Use provided client or create a new one with the specified region
if not client:
session = boto3.Session(profile_name=os.getenv("AWS_PROFILE", "default"))
client = session.client("ec2", region_name=self.region)
assert client, "EC2 client is required"

# check if the instance is already paused
Expand Down Expand Up @@ -561,7 +569,10 @@ def resume(self, client: Union["EC2Client", None] = None, wait_for_completion: b
Returns:
True if resume was successful, False otherwise
"""
client = client or boto3.client("ec2", region_name=self.region)
# Use provided client or create a new one with the specified region
if not client:
session = boto3.Session(profile_name=os.getenv("AWS_PROFILE", "default"))
client = session.client("ec2", region_name=self.region)
assert client, "EC2 client is required"

# check if the instance is already running
Expand Down Expand Up @@ -598,7 +609,10 @@ def terminate(self, client: Union["EC2Client", None] = None, wait_for_terminatio
Returns:
True if termination was successful, False otherwise
"""
client = client or boto3.client("ec2", region_name=self.region)
# Use provided client or create a new one with the specified region
if not client:
session = boto3.Session(profile_name=os.getenv("AWS_PROFILE", "default"))
client = session.client("ec2", region_name=self.region)
assert client, "EC2 client is required"

try:
Expand Down Expand Up @@ -637,7 +651,10 @@ def get_latest_ami_id(cls, instance_type: str, client: Union["SSMClient", None]
else:
image_id = "/aws/service/ami-amazon-linux-latest/al2023-ami-kernel-default-x86_64"

client = client or boto3.client("ssm")
# Use provided client or create a new one with the specified region
if not client:
session = boto3.Session(profile_name=os.getenv("AWS_PROFILE", "default"))
client = session.client("ssm", region_name=os.getenv("AWS_REGION", "us-east-1"))
assert client, "SSM client is required"

parameter = client.get_parameter(Name=image_id, WithDecryption=False)
Expand Down Expand Up @@ -678,9 +695,15 @@ def create_instance(
InstanceInfo object representing the newly created EC2 instance
"""
# Initialize the EC2 client with the specified region
client = client or boto3.client("ec2", region_name=region)
if not client:
session = boto3.Session(profile_name=os.getenv("AWS_PROFILE", "default"))
client = session.client("ec2", region_name=region or cls.region)
assert client, "EC2 client is required"

vpcs = client.describe_vpcs()["Vpcs"]
vpc_id = vpcs[0]["VpcId"]
print(f"Using VPC ID: {vpc_id}")

# If AMI ID is not provided, use a default Amazon Linux 2023 AMI (x86_64 or arm64 based on instance type)
ami_id = ami_id or cls.get_latest_ami_id(instance_type)

Expand Down Expand Up @@ -901,8 +924,8 @@ def import_ssh_key_to_ec2(key_name: str, region: str, private_key_path: str) ->
Returns:
The key pair ID if the import was successful.
"""
# Initialize the EC2 client with the specified region
ec2_client = boto3.client("ec2", region_name=region)
session = boto3.Session(profile_name=os.getenv("AWS_PROFILE", "default"))
ec2_client = session.client("ec2", region_name=region or os.getenv("AWS_REGION", "us-east-1"))

# Use default SSH private key path if not specified
if not private_key_path:
Expand Down Expand Up @@ -1210,7 +1233,8 @@ def create_instances(
logger.info("No existing instances found. Starting with index 0")

# Initialize the EC2 client with the specified region
ec2_client = boto3.client("ec2", region_name=region)
session = boto3.Session(profile_name=os.getenv("AWS_PROFILE", "default"))
ec2_client = session.client("ec2", region_name=region)
logger.debug(f"Initialized EC2 client for region {region}")

instances = []
Expand Down Expand Up @@ -1257,7 +1281,8 @@ def list_instances(
"""
logger.info(f"Listing instances with project={name} in region {region}")

client = boto3.client("ec2", region_name=region)
session = boto3.Session(profile_name=os.getenv("AWS_PROFILE", "default"))
client = session.client("ec2", region_name=region)

# Retrieve matching instances
instances = InstanceInfo.describe_instances(
Expand Down Expand Up @@ -1306,7 +1331,8 @@ def terminate_instances(
"""
logger.info(f"Terminating instances with project={name} in region {region}")

client = boto3.client("ec2", region_name=region)
session = boto3.Session(profile_name=os.getenv("AWS_PROFILE", "default"))
client = session.client("ec2", region_name=region)

# Retrieve instances matching the project and owner tags
instances = InstanceInfo.describe_instances(
Expand Down Expand Up @@ -1354,7 +1380,8 @@ def pause_instances(
"""
logger.info(f"Pausing instances with project={name} in region {region}")

client = boto3.client("ec2", region_name=region)
session = boto3.Session(profile_name=os.getenv("AWS_PROFILE", "default"))
client = session.client("ec2", region_name=region)

# Retrieve instances matching the project and owner tags
instances = InstanceInfo.describe_instances(
Expand Down Expand Up @@ -1398,7 +1425,9 @@ def resume_instances(
instance_id: Optional list of specific instance IDs to resume
detach: Whether to return immediately without waiting for resume to complete
"""
client = boto3.client("ec2", region_name=region)

session = boto3.Session(profile_name=os.getenv("AWS_PROFILE", "default"))
client = session.client("ec2", region_name=region)

logger.info(f"Resuming instances with project={name} in region {region}")

Expand Down
101 changes: 101 additions & 0 deletions src/cookbook/recipes/pmr/alldressed-tagging/build_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import os
import argparse

template = r"""#!/bin/bash

set -e

# Check if /mnt/raid0/models is empty and download artifacts if needed
if [ ! -d "/mnt/raid0/models" ] || [ -z "$(ls -A /mnt/raid0/models)" ]; then
echo "Models directory is empty, downloading artifacts..."
mkdir -p "/mnt/raid0/models"
s5cmd cp -sp s3://ai2-llm/pretraining-data/sources/WebOrganizer/fasttext/models/Topic/may31_lr05_ng3_n3M6_ova_combined-v3.bin /mnt/raid0/models/
s5cmd cp -sp s3://ai2-llm/pretraining-data/sources/dclm/refinedweb/dolma_reformat/pools/fasttext_models/oh_uc_wc_eli5_fasttext_model_bigram_200k.bin /mnt/raid0/models/
else
echo "Models directory already contains files, skipping download..."
fi

SRC_S3_PREFIX="s3://ai2-llm/pretraining-data/sources/cc_all_dressed/all_dressed_v3/sa_minlen500/filtered"
DST_S3_PREFIX="s3://ai2-llm/pretraining-data/sources/cc_all_dressed/all_dressed_v3/sa_minlen500/filtered_may31_lr05_ng3_n3M6_ova_combined-v3-partitioned"

# Store the input argument
X=XXX

# Step 0: Prepare runtime and local storage
echo "Preparing runtime..."
rm -rf "/mnt/raid0/input"
rm -rf "/mnt/raid0/output"
rm -rf "/mnt/raid0/logs"

mkdir -p "/mnt/raid0/input"
mkdir -p "/mnt/raid0/output"
mkdir -p "/mnt/raid0/logs"

cd ~/datamap-rs
git checkout undfined/tag-alldressed; git pull


# Step 1: Copy from S3 to local storage
echo "Copying data from S3 to local storage..."
s5cmd cp -sp "$SRC_S3_PREFIX/${X}/*" "/mnt/raid0/input/"


# Step 2: Run the tag operation
echo "Running tag operation..."
cargo run --release -- map --input-dir /mnt/raid0/input --output-dir /mnt/raid0/input/annotated/ --config examples/tag_alldressed/tag-docs.yaml > "/mnt/raid0/logs/tag-docs-${X}.log"


# Step 3: Run the partition operation
echo "Running partition operation..."
cargo run --release -- partition --input-dir /mnt/raid0/input/annotated/step_final/ --output-dir /mnt/raid0/input/partitioned/ --config examples/tag_alldressed/partition-docs.yaml > "/mnt/raid0/logs/partition-docs-${X}.log"


# Step 4: Relocate partitioned files under category directories
echo "Relocating partitioned files..."
OUTPUT_DIR="/mnt/raid0/output/partitioned"
mkdir -p "$OUTPUT_DIR"

# Create directories and move files based on labels
echo "Looking for files matching pattern: /mnt/raid0/input/partitioned/chunk___*__*.jsonl.zst"
for file in /mnt/raid0/input/partitioned/chunk___*__*.jsonl.zst; do
# Extract the label from the filename
label=$(basename "$file" | sed 's/.*__\([^.]*\)\..*/\1/')

# Fix typo in label electronics_and_hardare
label=$(echo "$label" | sed 's/electronics_and_hardare/electronics_and_hardware/g')

# Extract the new filename (remove chunk___*__ prefix)
new_filename=$(basename "$file" | sed 's/chunk___[^_]*__//')

# Fix typo in new filename
new_filename=$(echo "$new_filename" | sed 's/electronics_and_hardare/electronics_and_hardware/g')

# Create directory if it doesn't exist
mkdir -p "$OUTPUT_DIR/$label"

# Move the file
mv "$file" "$OUTPUT_DIR/$label/$new_filename"

echo "Moved $file to $OUTPUT_DIR/$label/$new_filename"
done


# Step 5: Copy partitioned files to S3
echo "Copying output to S3..."
s5cmd cp -sp "/mnt/raid0/output/partitioned/*" "$DST_S3_PREFIX/${X}/"
s5cmd cp -sp "/mnt/raid0/logs/*.log" "$DST_S3_PREFIX/logs/"

echo "Processing complete for chunk $X"
"""


parser = argparse.ArgumentParser()
parser.add_argument("--output-dir", required=True, help="Directory to write shell scripts to")
args = parser.parse_args()

os.makedirs(args.output_dir, exist_ok=True)

for i in range(32):
output_path = os.path.join(args.output_dir, "part_%02d.sh" % i)
with open(output_path, "w") as f:
f.write(template.replace("XXX", "%02d" % i))
85 changes: 85 additions & 0 deletions src/cookbook/recipes/pmr/alldressed-tagging/tasks/part_00.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#!/bin/bash

set -e

# Check if /mnt/raid0/models is empty and download artifacts if needed
if [ ! -d "/mnt/raid0/models" ] || [ -z "$(ls -A /mnt/raid0/models)" ]; then
echo "Models directory is empty, downloading artifacts..."
mkdir -p "/mnt/raid0/models"
s5cmd cp -sp s3://ai2-llm/pretraining-data/sources/WebOrganizer/fasttext/models/Topic/may31_lr05_ng3_n3M6_ova_combined-v3.bin /mnt/raid0/models/
s5cmd cp -sp s3://ai2-llm/pretraining-data/sources/dclm/refinedweb/dolma_reformat/pools/fasttext_models/oh_uc_wc_eli5_fasttext_model_bigram_200k.bin /mnt/raid0/models/
else
echo "Models directory already contains files, skipping download..."
fi

SRC_S3_PREFIX="s3://ai2-llm/pretraining-data/sources/cc_all_dressed/all_dressed_v3/sa_minlen500/filtered"
DST_S3_PREFIX="s3://ai2-llm/pretraining-data/sources/cc_all_dressed/all_dressed_v3/sa_minlen500/filtered_may31_lr05_ng3_n3M6_ova_combined-v3-partitioned"

# Store the input argument
X=00

# Step 0: Prepare runtime and local storage
echo "Preparing runtime..."
rm -rf "/mnt/raid0/input"
rm -rf "/mnt/raid0/output"
rm -rf "/mnt/raid0/logs"

mkdir -p "/mnt/raid0/input"
mkdir -p "/mnt/raid0/output"
mkdir -p "/mnt/raid0/logs"

cd ~/datamap-rs
git checkout undfined/tag-alldressed; git pull


# Step 1: Copy from S3 to local storage
echo "Copying data from S3 to local storage..."
s5cmd cp -sp "$SRC_S3_PREFIX/${X}/*" "/mnt/raid0/input/"


# Step 2: Run the tag operation
echo "Running tag operation..."
cargo run --release -- map --input-dir /mnt/raid0/input --output-dir /mnt/raid0/input/annotated/ --config examples/tag_alldressed/tag-docs.yaml > "/mnt/raid0/logs/tag-docs-${X}.log"


# Step 3: Run the partition operation
echo "Running partition operation..."
cargo run --release -- partition --input-dir /mnt/raid0/input/annotated/step_final/ --output-dir /mnt/raid0/input/partitioned/ --config examples/tag_alldressed/partition-docs.yaml > "/mnt/raid0/logs/partition-docs-${X}.log"


# Step 4: Relocate partitioned files under category directories
echo "Relocating partitioned files..."
OUTPUT_DIR="/mnt/raid0/output/partitioned"
mkdir -p "$OUTPUT_DIR"

# Create directories and move files based on labels
echo "Looking for files matching pattern: /mnt/raid0/input/partitioned/chunk___*__*.jsonl.zst"
for file in /mnt/raid0/input/partitioned/chunk___*__*.jsonl.zst; do
# Extract the label from the filename
label=$(basename "$file" | sed 's/.*__\([^.]*\)\..*/\1/')

# Fix typo in label electronics_and_hardare
label=$(echo "$label" | sed 's/electronics_and_hardare/electronics_and_hardware/g')

# Extract the new filename (remove chunk___*__ prefix)
new_filename=$(basename "$file" | sed 's/chunk___[^_]*__//')

# Fix typo in new filename
new_filename=$(echo "$new_filename" | sed 's/electronics_and_hardare/electronics_and_hardware/g')

# Create directory if it doesn't exist
mkdir -p "$OUTPUT_DIR/$label"

# Move the file
mv "$file" "$OUTPUT_DIR/$label/$new_filename"

echo "Moved $file to $OUTPUT_DIR/$label/$new_filename"
done


# Step 5: Copy partitioned files to S3
echo "Copying output to S3..."
s5cmd cp -sp "/mnt/raid0/output/partitioned/*" "$DST_S3_PREFIX/${X}/"
s5cmd cp -sp "/mnt/raid0/logs/*.log" "$DST_S3_PREFIX/logs/"

echo "Processing complete for chunk $X"
Loading