Skip to content

ACP client-only examples #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions examples/client_workflows/buyer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import os
import time
from datetime import datetime, timedelta, timezone
from dotenv import load_dotenv

from virtuals_acp import VirtualsACP, ACPJob, ACPJobPhase
from virtuals_acp.configs import BASE_SEPOLIA_CONFIG

load_dotenv(override=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would suggest to add a .env.example


BUYER_PRIVATE_KEY = os.environ.get("WHITELISTED_WALLET_PRIVATE_KEY")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you import envs from EnvSettings as well so that validations are done first? https://github.com/Virtual-Protocol/acp-python/blob/feat_client_examples/examples/acp_base/self_evaluation/buyer.py#L16

BUYER_WALLET_ADDRESS = os.environ.get("BUYER_AGENT_WALLET_ADDRESS")

# --- Configuration for the job ---
POLL_INTERVAL_SECONDS = 10
TARGET_SELLER_WALLET = os.environ.get("SELLER_AGENT_WALLET_ADDRESS")
TARGET_EVALUATOR_WALLET = os.environ.get("EVALUATOR_AGENT_WALLET_ADDRESS")
SERVICE_PRICE = "1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SERVICE_REQUIREMENT = {
"prompt": "Create a funny cat meme for a Python SDK example",
"format": "png"
}
# ----------------------------------

if not all([BUYER_PRIVATE_KEY, BUYER_WALLET_ADDRESS, TARGET_SELLER_WALLET, TARGET_EVALUATOR_WALLET]):
print("Error: Ensure BUYER_PRIVATE_KEY, BUYER_WALLET_ADDRESS, SELLER_WALLET_ADDRESS, EVALUATOR_WALLET_ADDRESS are set.")
exit(1)

def main():

print("--- Buyer Script ---")
buyer_acp = VirtualsACP(
wallet_private_key=BUYER_PRIVATE_KEY,
agent_wallet_address=BUYER_WALLET_ADDRESS,
config=BASE_SEPOLIA_CONFIG
)
print(f"Buyer ACP Initialized. Agent: {buyer_acp.agent_address}")

# 1. Initiate Job
print(f"\nInitiating job with Seller: {TARGET_SELLER_WALLET}, Evaluator: {TARGET_EVALUATOR_WALLET}")
expired_at = datetime.now(timezone.utc) + timedelta(days=1)

onchain_job_id = buyer_acp.initiate_job(
provider_address=TARGET_SELLER_WALLET,
service_requirement=SERVICE_REQUIREMENT, # Already a dict
expired_at=expired_at,
evaluator_address=TARGET_EVALUATOR_WALLET,
amount=float(SERVICE_PRICE) # Pass actual price for API record
)
print(f"Job {onchain_job_id} initiated.")

# 2. Wait for Seller's acceptance memo (which sets next_phase to TRANSACTION)
print(f"\nWaiting for Seller to accept job {onchain_job_id}...")
memo_to_sign_for_payment_id = None

while True:
# wait for some time before checking job again
time.sleep(POLL_INTERVAL_SECONDS)
job_details: ACPJob = buyer_acp.get_job_by_onchain_id(onchain_job_id)
print(f"Polling Job {onchain_job_id}: Current Phase: {job_details.phase.name}")

if job_details.phase == ACPJobPhase.NEGOTIATION:
# Seller has responded. Find the memo they created.
# This memo will have next_phase = TRANSACTION.
found_seller_memo = False
for memo in reversed(job_details.memos): # Check latest memo first
# Find the seller's response memo.
if ACPJobPhase(memo.next_phase) == ACPJobPhase.TRANSACTION:
memo_to_sign_for_payment_id = memo.id
print(f"Seller accepted. Buyer will sign seller's memo {memo_to_sign_for_payment_id} for payment.")
found_seller_memo = True
break
if found_seller_memo:
break # Exit while loop

elif job_details.phase == ACPJobPhase.REJECTED:
print(f"Job {onchain_job_id} was rejected by seller.")
return # Exit main function
elif job_details.phase == ACPJobPhase.REQUEST:
print(f"Job {onchain_job_id} still in REQUEST phase. Waiting for seller...")

if not memo_to_sign_for_payment_id:
print(f"Could not identify seller's acceptance memo for job {onchain_job_id}. Exiting.")
return

# 3. Pay for the Job
print(f"\nPaying for job {onchain_job_id} (Price: {SERVICE_PRICE}), by signing memo {memo_to_sign_for_payment_id}...")
try:
buyer_acp.pay_for_job(
job_id=onchain_job_id,
memo_id=memo_to_sign_for_payment_id,
amount=SERVICE_PRICE,
reason="Payment for meme generation service."
)
print(f"Payment process initiated for job {onchain_job_id}.")
except Exception as e:
print(f"Error paying for job {onchain_job_id}: {e}")
return

# 4. Wait for Job Completion
print(f"\nWaiting for job {onchain_job_id} to be completed by evaluator...")
while True:
time.sleep(POLL_INTERVAL_SECONDS)
job_details = buyer_acp.get_job_by_onchain_id(onchain_job_id)
print(f"Polling Job {onchain_job_id}: Current Phase: {job_details.phase.name}")

if job_details.phase == ACPJobPhase.COMPLETED:
print(f"Job {onchain_job_id} successfully COMPLETED!")
break
elif job_details.phase == ACPJobPhase.REJECTED:
print(f"Job {onchain_job_id} was REJECTED during/after evaluation.")
break
elif job_details.phase == ACPJobPhase.EVALUATION:
print(f"Job {onchain_job_id} is in EVALUATION. Waiting for evaluator's decision...")
elif job_details.phase == ACPJobPhase.TRANSACTION:
print(f"Job {onchain_job_id} is in TRANSACTION. Waiting for seller to deliver...")
# else, keep polling for other phases

print("\n--- Buyer Script Finished ---")

if __name__ == "__main__":
main()
90 changes: 90 additions & 0 deletions examples/client_workflows/evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import os
import time
from typing import List
from dotenv import load_dotenv

from virtuals_acp import VirtualsACP, ACPJob, ACPJobPhase
from virtuals_acp.configs import BASE_SEPOLIA_CONFIG

load_dotenv(override=True)

EVALUATOR_PRIVATE_KEY = os.environ.get("WHITELISTED_WALLET_PRIVATE_KEY")
EVALUATOR_WALLET_ADDRESS = os.environ.get("EVALUATOR_AGENT_WALLET_ADDRESS")

POLL_INTERVAL_SECONDS = 20

if not all([EVALUATOR_PRIVATE_KEY, EVALUATOR_WALLET_ADDRESS]):
print("Error: Ensure EVALUATOR_PRIVATE_KEY and EVALUATOR_WALLET_ADDRESS are set.")
exit(1)

def main():
print("--- Evaluator Script (Simplified Direct Client Usage) ---")
evaluator_acp = VirtualsACP(
wallet_private_key=EVALUATOR_PRIVATE_KEY,
agent_wallet_address=EVALUATOR_WALLET_ADDRESS,
config=BASE_SEPOLIA_CONFIG
)
print(f"Evaluator ACP Initialized. Agent: {evaluator_acp.agent_address}")

evaluated_deliverables = set() # Store (job_id, deliverable_memo_id) to avoid re-evaluation

while True:
print(f"\nEvaluator: Polling for jobs assigned to {EVALUATOR_WALLET_ADDRESS} requiring evaluation...")
active_jobs_list: List[ACPJob] = evaluator_acp.get_active_jobs()

if not active_jobs_list:
print("Evaluator: No active jobs found in this poll.")
time.sleep(POLL_INTERVAL_SECONDS)
continue

for job_summary in active_jobs_list:
onchain_job_id = job_summary.id

try:
job_details = evaluator_acp.get_job_by_onchain_id(onchain_job_id)
current_phase = job_details.phase

if current_phase == ACPJobPhase.EVALUATION:
print(f"Evaluator: Found Job {onchain_job_id} in EVALUATION phase.")

# Find the seller's deliverable memo. Its next_phase should be COMPLETED.
seller_deliverable_memo_to_sign = None
for memo in reversed(job_details.memos): # Check latest first
if ACPJobPhase(memo.next_phase) == ACPJobPhase.COMPLETED:
seller_deliverable_memo_to_sign = memo
break

if seller_deliverable_memo_to_sign:
deliverable_key = (onchain_job_id, seller_deliverable_memo_to_sign.id)
if deliverable_key in evaluated_deliverables:
# print(f"Deliverable memo {seller_deliverable_memo_to_sign.id} for job {onchain_job_id} already processed.")
continue

print(f" Job {onchain_job_id}: Found deliverable memo {seller_deliverable_memo_to_sign.id} to evaluate.")
print(f" Deliverable Content: {seller_deliverable_memo_to_sign.content}")

# Simple evaluation logic: always accept
accept_the_delivery = True
evaluation_reason = "Deliverable looks great, approved!"

print(f" Job {onchain_job_id}: Evaluating... Accepting: {accept_the_delivery}")
evaluator_acp.evaluate_job_delivery(
memo_id_of_deliverable=seller_deliverable_memo_to_sign.id,
accept=accept_the_delivery,
reason=evaluation_reason
)
print(f" Job {onchain_job_id}: Evaluation submitted for memo {seller_deliverable_memo_to_sign.id}.")
evaluated_deliverables.add(deliverable_key)
else:
print(f" Job {onchain_job_id} in EVALUATION, but no deliverable memo (next_phase=COMPLETED) found yet.")
elif current_phase in [ACPJobPhase.COMPLETED, ACPJobPhase.REJECTED]:
print(f"Evaluator: Job {onchain_job_id} is already in {current_phase.name}. No action.")
# Potentially add to a "handled" set if not using evaluated_deliverables

except Exception as e:
print(f"Evaluator: Error processing job {onchain_job_id}: {e}")

time.sleep(POLL_INTERVAL_SECONDS)

if __name__ == "__main__":
main()
121 changes: 121 additions & 0 deletions examples/client_workflows/seller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import os
import time
import json
from typing import List
from dotenv import load_dotenv

from virtuals_acp import VirtualsACP, ACPJob, ACPJobPhase
from virtuals_acp.configs import BASE_SEPOLIA_CONFIG

load_dotenv(override=True)

SELLER_PRIVATE_KEY = os.environ.get("WHITELISTED_WALLET_PRIVATE_KEY")
SELLER_WALLET_ADDRESS = os.environ.get("SELLER_AGENT_WALLET_ADDRESS")

if not all([SELLER_PRIVATE_KEY, SELLER_WALLET_ADDRESS]):
print("Error: Ensure SELLER_PRIVATE_KEY and SELLER_WALLET_ADDRESS are set.")
exit(1)

DELIVERABLE_CONTENT = {
"type": "url",
"value": "https://virtuals.io/delivered_meme.png",
"notes": "Your meme is ready!"
}

POLL_INTERVAL_SECONDS = 20

def main():
print("--- Seller Script ---")
seller_acp = VirtualsACP(
wallet_private_key=SELLER_PRIVATE_KEY,
agent_wallet_address=SELLER_WALLET_ADDRESS,
config=BASE_SEPOLIA_CONFIG
)
print(f"Seller ACP Initialized. Agent: {seller_acp.agent_address}")

# Keep track of jobs to avoid reprocessing in this simple loop
# job_id: {"responded_to_request": bool, "delivered_work": bool}
processed_job_stages = {}

while True:
print(f"\nSeller: Polling for active jobs for {SELLER_WALLET_ADDRESS}...")
active_jobs_list: List[ACPJob] = seller_acp.get_active_jobs()

if not active_jobs_list:
print("Seller: No active jobs found in this poll.")
time.sleep(POLL_INTERVAL_SECONDS)
continue

for job_summary in active_jobs_list:
onchain_job_id = job_summary.id

# Ensure this job is for the current seller
if job_summary.provider_address != seller_acp.agent_address:
continue

job_stages = processed_job_stages.get(onchain_job_id, {})

try:
# Fetch full details to get current phase and memos
job_details = seller_acp.get_job_by_onchain_id(onchain_job_id)
current_phase = job_details.phase
print(f"Seller: Checking job {onchain_job_id}. Current Phase: {current_phase.name}")

# 1. Respond to Job Request (if not already responded)
if current_phase == ACPJobPhase.REQUEST and not job_stages.get("responded_to_request"):
# Buyer's initial memo will have next_phase = NEGOTIATION
buyers_initial_memo_to_sign = None
for memo in reversed(job_details.memos):
if ACPJobPhase(memo.next_phase) == ACPJobPhase.NEGOTIATION:
buyers_initial_memo_to_sign = memo
break

if buyers_initial_memo_to_sign:
print(f"Seller: Job {onchain_job_id} is in REQUEST. Responding to buyer's memo {buyers_initial_memo_to_sign.id}...")
seller_acp.respond_to_job_memo(
job_id=onchain_job_id,
memo_id=buyers_initial_memo_to_sign.id, # ID of the memo created by Buyer
accept=True,
reason="Seller accepts the job offer."
)
print(f"Seller: Accepted job {onchain_job_id}. Job phase should move to NEGOTIATION.")
job_stages["responded_to_request"] = True
else:
print(f"Seller: Job {onchain_job_id} in REQUEST, but could not find buyer's initial memo.")

# 2. Submit Deliverable (if job is paid and not yet delivered)
elif current_phase == ACPJobPhase.TRANSACTION and not job_stages.get("delivered_work"):
# Buyer has paid, job is in TRANSACTION. Seller needs to deliver.
# The latest memo from buyer would have next_phase = EVALUATION
buyers_payment_confirmation_memo = None
for memo in reversed(job_details.memos):
if ACPJobPhase(memo.next_phase) == ACPJobPhase.EVALUATION:
buyers_payment_confirmation_memo = memo
break

if buyers_payment_confirmation_memo:
print(f"Seller: Job {onchain_job_id} is PAID (TRANSACTION phase). Submitting deliverable...")
seller_acp.submit_job_deliverable(
job_id=onchain_job_id,
deliverable_content=json.dumps(DELIVERABLE_CONTENT)
)
print(f"Seller: Deliverable submitted for job {onchain_job_id}. Job should move to EVALUATION.")
job_stages["delivered_work"] = True
else:
print(f"Seller: Job {onchain_job_id} in TRANSACTION, but couldn't find buyer's payment memo to confirm.")

elif current_phase in [ACPJobPhase.EVALUATION, ACPJobPhase.COMPLETED, ACPJobPhase.REJECTED]:
print(f"Seller: Job {onchain_job_id} is in {current_phase.name}. No further action for seller.")
# Mark as fully handled for this script
job_stages["responded_to_request"] = True
job_stages["delivered_work"] = True

processed_job_stages[onchain_job_id] = job_stages

except Exception as e:
print(f"Seller: Error processing job {onchain_job_id}: {e}")

time.sleep(POLL_INTERVAL_SECONDS)

if __name__ == "__main__":
main()