diff --git a/examples/client_workflows/buyer.py b/examples/client_workflows/buyer.py new file mode 100644 index 0000000..627a717 --- /dev/null +++ b/examples/client_workflows/buyer.py @@ -0,0 +1,122 @@ +import os +import time +from datetime import datetime, timedelta, timezone +from dotenv import load_dotenv + +from virtuals_acp import VirtualsACP, ACPJob, ACPJobPhase +from virtuals_acp.configs import BASE_SEPOLIA_CONFIG + +load_dotenv(override=True) + +BUYER_PRIVATE_KEY = os.environ.get("WHITELISTED_WALLET_PRIVATE_KEY") +BUYER_WALLET_ADDRESS = os.environ.get("BUYER_AGENT_WALLET_ADDRESS") + +# --- Configuration for the job --- +POLL_INTERVAL_SECONDS = 10 +TARGET_SELLER_WALLET = os.environ.get("SELLER_AGENT_WALLET_ADDRESS") +TARGET_EVALUATOR_WALLET = os.environ.get("EVALUATOR_AGENT_WALLET_ADDRESS") +SERVICE_PRICE = "1" +SERVICE_REQUIREMENT = { + "prompt": "Create a funny cat meme for a Python SDK example", + "format": "png" +} +# ---------------------------------- + +if not all([BUYER_PRIVATE_KEY, BUYER_WALLET_ADDRESS, TARGET_SELLER_WALLET, TARGET_EVALUATOR_WALLET]): + print("Error: Ensure BUYER_PRIVATE_KEY, BUYER_WALLET_ADDRESS, SELLER_WALLET_ADDRESS, EVALUATOR_WALLET_ADDRESS are set.") + exit(1) + +def main(): + + print("--- Buyer Script ---") + buyer_acp = VirtualsACP( + wallet_private_key=BUYER_PRIVATE_KEY, + agent_wallet_address=BUYER_WALLET_ADDRESS, + config=BASE_SEPOLIA_CONFIG + ) + print(f"Buyer ACP Initialized. Agent: {buyer_acp.agent_address}") + + # 1. Initiate Job + print(f"\nInitiating job with Seller: {TARGET_SELLER_WALLET}, Evaluator: {TARGET_EVALUATOR_WALLET}") + expired_at = datetime.now(timezone.utc) + timedelta(days=1) + + onchain_job_id = buyer_acp.initiate_job( + provider_address=TARGET_SELLER_WALLET, + service_requirement=SERVICE_REQUIREMENT, # Already a dict + expired_at=expired_at, + evaluator_address=TARGET_EVALUATOR_WALLET, + amount=float(SERVICE_PRICE) # Pass actual price for API record + ) + print(f"Job {onchain_job_id} initiated.") + + # 2. Wait for Seller's acceptance memo (which sets next_phase to TRANSACTION) + print(f"\nWaiting for Seller to accept job {onchain_job_id}...") + memo_to_sign_for_payment_id = None + + while True: + # wait for some time before checking job again + time.sleep(POLL_INTERVAL_SECONDS) + job_details: ACPJob = buyer_acp.get_job_by_onchain_id(onchain_job_id) + print(f"Polling Job {onchain_job_id}: Current Phase: {job_details.phase.name}") + + if job_details.phase == ACPJobPhase.NEGOTIATION: + # Seller has responded. Find the memo they created. + # This memo will have next_phase = TRANSACTION. + found_seller_memo = False + for memo in reversed(job_details.memos): # Check latest memo first + # Find the seller's response memo. + if ACPJobPhase(memo.next_phase) == ACPJobPhase.TRANSACTION: + memo_to_sign_for_payment_id = memo.id + print(f"Seller accepted. Buyer will sign seller's memo {memo_to_sign_for_payment_id} for payment.") + found_seller_memo = True + break + if found_seller_memo: + break # Exit while loop + + elif job_details.phase == ACPJobPhase.REJECTED: + print(f"Job {onchain_job_id} was rejected by seller.") + return # Exit main function + elif job_details.phase == ACPJobPhase.REQUEST: + print(f"Job {onchain_job_id} still in REQUEST phase. Waiting for seller...") + + if not memo_to_sign_for_payment_id: + print(f"Could not identify seller's acceptance memo for job {onchain_job_id}. Exiting.") + return + + # 3. Pay for the Job + print(f"\nPaying for job {onchain_job_id} (Price: {SERVICE_PRICE}), by signing memo {memo_to_sign_for_payment_id}...") + try: + buyer_acp.pay_for_job( + job_id=onchain_job_id, + memo_id=memo_to_sign_for_payment_id, + amount=SERVICE_PRICE, + reason="Payment for meme generation service." + ) + print(f"Payment process initiated for job {onchain_job_id}.") + except Exception as e: + print(f"Error paying for job {onchain_job_id}: {e}") + return + + # 4. Wait for Job Completion + print(f"\nWaiting for job {onchain_job_id} to be completed by evaluator...") + while True: + time.sleep(POLL_INTERVAL_SECONDS) + job_details = buyer_acp.get_job_by_onchain_id(onchain_job_id) + print(f"Polling Job {onchain_job_id}: Current Phase: {job_details.phase.name}") + + if job_details.phase == ACPJobPhase.COMPLETED: + print(f"Job {onchain_job_id} successfully COMPLETED!") + break + elif job_details.phase == ACPJobPhase.REJECTED: + print(f"Job {onchain_job_id} was REJECTED during/after evaluation.") + break + elif job_details.phase == ACPJobPhase.EVALUATION: + print(f"Job {onchain_job_id} is in EVALUATION. Waiting for evaluator's decision...") + elif job_details.phase == ACPJobPhase.TRANSACTION: + print(f"Job {onchain_job_id} is in TRANSACTION. Waiting for seller to deliver...") + # else, keep polling for other phases + + print("\n--- Buyer Script Finished ---") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/client_workflows/evaluator.py b/examples/client_workflows/evaluator.py new file mode 100644 index 0000000..a7ac637 --- /dev/null +++ b/examples/client_workflows/evaluator.py @@ -0,0 +1,90 @@ +import os +import time +from typing import List +from dotenv import load_dotenv + +from virtuals_acp import VirtualsACP, ACPJob, ACPJobPhase +from virtuals_acp.configs import BASE_SEPOLIA_CONFIG + +load_dotenv(override=True) + +EVALUATOR_PRIVATE_KEY = os.environ.get("WHITELISTED_WALLET_PRIVATE_KEY") +EVALUATOR_WALLET_ADDRESS = os.environ.get("EVALUATOR_AGENT_WALLET_ADDRESS") + +POLL_INTERVAL_SECONDS = 20 + +if not all([EVALUATOR_PRIVATE_KEY, EVALUATOR_WALLET_ADDRESS]): + print("Error: Ensure EVALUATOR_PRIVATE_KEY and EVALUATOR_WALLET_ADDRESS are set.") + exit(1) + +def main(): + print("--- Evaluator Script (Simplified Direct Client Usage) ---") + evaluator_acp = VirtualsACP( + wallet_private_key=EVALUATOR_PRIVATE_KEY, + agent_wallet_address=EVALUATOR_WALLET_ADDRESS, + config=BASE_SEPOLIA_CONFIG + ) + print(f"Evaluator ACP Initialized. Agent: {evaluator_acp.agent_address}") + + evaluated_deliverables = set() # Store (job_id, deliverable_memo_id) to avoid re-evaluation + + while True: + print(f"\nEvaluator: Polling for jobs assigned to {EVALUATOR_WALLET_ADDRESS} requiring evaluation...") + active_jobs_list: List[ACPJob] = evaluator_acp.get_active_jobs() + + if not active_jobs_list: + print("Evaluator: No active jobs found in this poll.") + time.sleep(POLL_INTERVAL_SECONDS) + continue + + for job_summary in active_jobs_list: + onchain_job_id = job_summary.id + + try: + job_details = evaluator_acp.get_job_by_onchain_id(onchain_job_id) + current_phase = job_details.phase + + if current_phase == ACPJobPhase.EVALUATION: + print(f"Evaluator: Found Job {onchain_job_id} in EVALUATION phase.") + + # Find the seller's deliverable memo. Its next_phase should be COMPLETED. + seller_deliverable_memo_to_sign = None + for memo in reversed(job_details.memos): # Check latest first + if ACPJobPhase(memo.next_phase) == ACPJobPhase.COMPLETED: + seller_deliverable_memo_to_sign = memo + break + + if seller_deliverable_memo_to_sign: + deliverable_key = (onchain_job_id, seller_deliverable_memo_to_sign.id) + if deliverable_key in evaluated_deliverables: + # print(f"Deliverable memo {seller_deliverable_memo_to_sign.id} for job {onchain_job_id} already processed.") + continue + + print(f" Job {onchain_job_id}: Found deliverable memo {seller_deliverable_memo_to_sign.id} to evaluate.") + print(f" Deliverable Content: {seller_deliverable_memo_to_sign.content}") + + # Simple evaluation logic: always accept + accept_the_delivery = True + evaluation_reason = "Deliverable looks great, approved!" + + print(f" Job {onchain_job_id}: Evaluating... Accepting: {accept_the_delivery}") + evaluator_acp.evaluate_job_delivery( + memo_id_of_deliverable=seller_deliverable_memo_to_sign.id, + accept=accept_the_delivery, + reason=evaluation_reason + ) + print(f" Job {onchain_job_id}: Evaluation submitted for memo {seller_deliverable_memo_to_sign.id}.") + evaluated_deliverables.add(deliverable_key) + else: + print(f" Job {onchain_job_id} in EVALUATION, but no deliverable memo (next_phase=COMPLETED) found yet.") + elif current_phase in [ACPJobPhase.COMPLETED, ACPJobPhase.REJECTED]: + print(f"Evaluator: Job {onchain_job_id} is already in {current_phase.name}. No action.") + # Potentially add to a "handled" set if not using evaluated_deliverables + + except Exception as e: + print(f"Evaluator: Error processing job {onchain_job_id}: {e}") + + time.sleep(POLL_INTERVAL_SECONDS) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/client_workflows/seller.py b/examples/client_workflows/seller.py new file mode 100644 index 0000000..d092544 --- /dev/null +++ b/examples/client_workflows/seller.py @@ -0,0 +1,121 @@ +import os +import time +import json +from typing import List +from dotenv import load_dotenv + +from virtuals_acp import VirtualsACP, ACPJob, ACPJobPhase +from virtuals_acp.configs import BASE_SEPOLIA_CONFIG + +load_dotenv(override=True) + +SELLER_PRIVATE_KEY = os.environ.get("WHITELISTED_WALLET_PRIVATE_KEY") +SELLER_WALLET_ADDRESS = os.environ.get("SELLER_AGENT_WALLET_ADDRESS") + +if not all([SELLER_PRIVATE_KEY, SELLER_WALLET_ADDRESS]): + print("Error: Ensure SELLER_PRIVATE_KEY and SELLER_WALLET_ADDRESS are set.") + exit(1) + +DELIVERABLE_CONTENT = { + "type": "url", + "value": "https://virtuals.io/delivered_meme.png", + "notes": "Your meme is ready!" +} + +POLL_INTERVAL_SECONDS = 20 + +def main(): + print("--- Seller Script ---") + seller_acp = VirtualsACP( + wallet_private_key=SELLER_PRIVATE_KEY, + agent_wallet_address=SELLER_WALLET_ADDRESS, + config=BASE_SEPOLIA_CONFIG + ) + print(f"Seller ACP Initialized. Agent: {seller_acp.agent_address}") + + # Keep track of jobs to avoid reprocessing in this simple loop + # job_id: {"responded_to_request": bool, "delivered_work": bool} + processed_job_stages = {} + + while True: + print(f"\nSeller: Polling for active jobs for {SELLER_WALLET_ADDRESS}...") + active_jobs_list: List[ACPJob] = seller_acp.get_active_jobs() + + if not active_jobs_list: + print("Seller: No active jobs found in this poll.") + time.sleep(POLL_INTERVAL_SECONDS) + continue + + for job_summary in active_jobs_list: + onchain_job_id = job_summary.id + + # Ensure this job is for the current seller + if job_summary.provider_address != seller_acp.agent_address: + continue + + job_stages = processed_job_stages.get(onchain_job_id, {}) + + try: + # Fetch full details to get current phase and memos + job_details = seller_acp.get_job_by_onchain_id(onchain_job_id) + current_phase = job_details.phase + print(f"Seller: Checking job {onchain_job_id}. Current Phase: {current_phase.name}") + + # 1. Respond to Job Request (if not already responded) + if current_phase == ACPJobPhase.REQUEST and not job_stages.get("responded_to_request"): + # Buyer's initial memo will have next_phase = NEGOTIATION + buyers_initial_memo_to_sign = None + for memo in reversed(job_details.memos): + if ACPJobPhase(memo.next_phase) == ACPJobPhase.NEGOTIATION: + buyers_initial_memo_to_sign = memo + break + + if buyers_initial_memo_to_sign: + print(f"Seller: Job {onchain_job_id} is in REQUEST. Responding to buyer's memo {buyers_initial_memo_to_sign.id}...") + seller_acp.respond_to_job_memo( + job_id=onchain_job_id, + memo_id=buyers_initial_memo_to_sign.id, # ID of the memo created by Buyer + accept=True, + reason="Seller accepts the job offer." + ) + print(f"Seller: Accepted job {onchain_job_id}. Job phase should move to NEGOTIATION.") + job_stages["responded_to_request"] = True + else: + print(f"Seller: Job {onchain_job_id} in REQUEST, but could not find buyer's initial memo.") + + # 2. Submit Deliverable (if job is paid and not yet delivered) + elif current_phase == ACPJobPhase.TRANSACTION and not job_stages.get("delivered_work"): + # Buyer has paid, job is in TRANSACTION. Seller needs to deliver. + # The latest memo from buyer would have next_phase = EVALUATION + buyers_payment_confirmation_memo = None + for memo in reversed(job_details.memos): + if ACPJobPhase(memo.next_phase) == ACPJobPhase.EVALUATION: + buyers_payment_confirmation_memo = memo + break + + if buyers_payment_confirmation_memo: + print(f"Seller: Job {onchain_job_id} is PAID (TRANSACTION phase). Submitting deliverable...") + seller_acp.submit_job_deliverable( + job_id=onchain_job_id, + deliverable_content=json.dumps(DELIVERABLE_CONTENT) + ) + print(f"Seller: Deliverable submitted for job {onchain_job_id}. Job should move to EVALUATION.") + job_stages["delivered_work"] = True + else: + print(f"Seller: Job {onchain_job_id} in TRANSACTION, but couldn't find buyer's payment memo to confirm.") + + elif current_phase in [ACPJobPhase.EVALUATION, ACPJobPhase.COMPLETED, ACPJobPhase.REJECTED]: + print(f"Seller: Job {onchain_job_id} is in {current_phase.name}. No further action for seller.") + # Mark as fully handled for this script + job_stages["responded_to_request"] = True + job_stages["delivered_work"] = True + + processed_job_stages[onchain_job_id] = job_stages + + except Exception as e: + print(f"Seller: Error processing job {onchain_job_id}: {e}") + + time.sleep(POLL_INTERVAL_SECONDS) + +if __name__ == "__main__": + main() \ No newline at end of file