Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: SCP (Samsung Cloud Platform) Support Contribution #926

Merged
merged 6 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ Copy https://github.com/skyplane-project/skyplane/blob/main/skyplane/compute/ibm
into `~/.bluemix/ibm_credentials` and fill your
IBM IAM key and credentials to your IBM Cloud object storage

---> For SCP:
$ # Create directory if required
$ mkdir -p ~/.scp
$ # Add the lines for "access_key", "secret_key", and "project_id" to scp_credential file
$ echo "access_key = <your_access_key>" >> ~/.scp/scp_credential
$ echo "secret_key = <your_secret_key>" >> ~/.scp/scp_credential
$ echo "project_id = <your_project_id>" >> ~/.scp/scp_credential

```
After authenticating with each cloud provider, you can run `skyplane init` to create a configuration file for Skyplane.
Expand Down Expand Up @@ -149,6 +156,11 @@ $ skyplane init
Enter the GCP project ID [XXXXXXX]:
GCP region config file saved to /home/ubuntu/.skyplane/gcp_config

(4) Configuring SCP:
Loaded SCP credentials from the scp_credntial file [access key: ...XXXXXX]
SCP region config file saved to /home/ubuntu/.skyplane/scp_config


Config file saved to /home/ubuntu/.skyplane/config
```

Expand Down
1 change: 1 addition & 0 deletions skyplane/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@
"AWSConfig",
"AzureConfig",
"GCPConfig",
"SCPConfig",
"TransferHook",
]
5 changes: 4 additions & 1 deletion skyplane/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from skyplane.api.pipeline import Pipeline

if TYPE_CHECKING:
from skyplane.api.config import AWSConfig, AzureConfig, GCPConfig, TransferConfig, IBMCloudConfig
from skyplane.api.config import AWSConfig, AzureConfig, GCPConfig, TransferConfig, IBMCloudConfig, SCPConfig


class SkyplaneClient:
Expand All @@ -26,6 +26,7 @@ def __init__(
azure_config: Optional["AzureConfig"] = None,
gcp_config: Optional["GCPConfig"] = None,
ibmcloud_config: Optional["IBMCloudConfig"] = None,
scp_config: Optional["SCPConfig"] = None,
transfer_config: Optional[TransferConfig] = None,
log_dir: Optional[str] = None,
):
Expand All @@ -48,6 +49,7 @@ def __init__(
self.azure_auth = azure_config.make_auth_provider() if azure_config else None
self.gcp_auth = gcp_config.make_auth_provider() if gcp_config else None
self.ibmcloud_auth = ibmcloud_config.make_auth_provider() if ibmcloud_config else None
self.scp_auth = scp_config.make_auth_provider() if scp_config else None
self.transfer_config = transfer_config if transfer_config else TransferConfig()
self.log_dir = (
tmp_log_dir / "transfer_logs" / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}-{uuid.uuid4().hex[:8]}"
Expand All @@ -66,6 +68,7 @@ def __init__(
azure_auth=self.azure_auth,
gcp_auth=self.gcp_auth,
ibmcloud_auth=self.ibmcloud_auth,
scp_auth=self.scp_auth,
)

def pipeline(self, planning_algorithm: Optional[str] = "direct", max_instances: Optional[int] = 1, debug=False):
Expand Down
19 changes: 18 additions & 1 deletion skyplane/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from skyplane import compute

from skyplane.config_paths import aws_quota_path, gcp_quota_path, azure_standardDv5_quota_path
from skyplane.config_paths import aws_quota_path, gcp_quota_path, azure_standardDv5_quota_path, scp_quota_path
from pathlib import Path


Expand Down Expand Up @@ -61,6 +61,17 @@ def make_auth_provider(self) -> compute.IBMCloudAuthentication:
# pytype: enable=attribute-error


@dataclass
class SCPConfig(AuthenticationConfig):
scp_access_key: Optional[str] = None
scp_secret_key: Optional[str] = None
scp_project_id: Optional[str] = None
scp_enabled: bool = False

def make_auth_provider(self) -> compute.SCPAuthentication:
return compute.SCPAuthentication(config=self) # type: ignore


@dataclass(frozen=True)
class TransferConfig:
autoterminate_minutes: int = 15
Expand All @@ -82,16 +93,22 @@ class TransferConfig:
azure_use_spot_instances: bool = False
gcp_use_spot_instances: bool = False
ibmcloud_use_spot_instances: bool = False
# Add SCP Support
scp_use_spot_instances: bool = False

aws_instance_class: str = "m5.8xlarge"
azure_instance_class: str = "Standard_D2_v5"
gcp_instance_class: str = "n2-standard-16"
ibmcloud_instance_class: str = "bx2-2x8"
gcp_use_premium_network: bool = True
# Add SCP Support
scp_instance_class: str = "h1v32m128"

aws_vcpu_file: Path = aws_quota_path
gcp_vcpu_file: Path = gcp_quota_path
azure_vcpu_file: Path = azure_standardDv5_quota_path
# Add SCP Support
scp_vcpu_file: Path = scp_quota_path
# TODO: add ibmcloud when the quota info is available

# multipart config
Expand Down
10 changes: 8 additions & 2 deletions skyplane/api/dataplane.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from skyplane.utils import logger
from skyplane.utils.definitions import gateway_docker_image, tmp_log_dir
from skyplane.utils.fn import PathLike, do_parallel
from skyplane.utils.retry import retry_backoff

if TYPE_CHECKING:
from skyplane.api.provisioner import Provisioner
Expand Down Expand Up @@ -156,6 +157,7 @@ def provision(
is_azure_used = any(n.region_tag.startswith("azure:") for n in self.topology.get_gateways())
is_gcp_used = any(n.region_tag.startswith("gcp:") for n in self.topology.get_gateways())
is_ibmcloud_used = any(n.region_tag.startswith("ibmcloud:") for n in self.topology.get_gateways())
is_scp_used = any(n.region_tag.startswith("scp:") for n in self.topology.get_gateways())

# create VMs from the topology
for node in self.topology.get_gateways():
Expand All @@ -172,7 +174,7 @@ def provision(
)

# initialize clouds
self.provisioner.init_global(aws=is_aws_used, azure=is_azure_used, gcp=is_gcp_used, ibmcloud=is_ibmcloud_used)
self.provisioner.init_global(aws=is_aws_used, azure=is_azure_used, gcp=is_gcp_used, ibmcloud=is_ibmcloud_used, scp=is_scp_used)

# provision VMs
uuids = self.provisioner.provision(
Expand Down Expand Up @@ -273,9 +275,13 @@ def deprovision(self, max_jobs: int = 64, spinner: bool = False):
def check_error_logs(self) -> Dict[str, List[str]]:
"""Get the error log from remote gateways if there is any error."""

def http_pool_request(instance):
return self.http_pool.request("GET", f"{instance.gateway_api_url}/api/v1/errors")

def get_error_logs(args):
_, instance = args
reply = self.http_pool.request("GET", f"{instance.gateway_api_url}/api/v1/errors")
# reply = self.http_pool.request("GET", f"{instance.gateway_api_url}/api/v1/errors")
reply = retry_backoff(partial(http_pool_request, instance))
if reply.status != 200:
raise Exception(f"Failed to get error logs from gateway instance {instance.instance_name()}: {reply.data.decode('utf-8')}")
return json.loads(reply.data.decode("utf-8"))["errors"]
Expand Down
2 changes: 2 additions & 0 deletions skyplane/api/obj_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def create_bucket(self, region: str, bucket_name: str):
return f"s3://{bucket_name}"
elif provider == "gcp":
return f"gs://{bucket_name}"
elif provider == "scp":
return f"scp://{bucket_name}"
else:
raise NotImplementedError(f"Provider {provider} not implemented")

Expand Down
56 changes: 55 additions & 1 deletion skyplane/api/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(
gcp_auth: Optional[compute.GCPAuthentication] = None,
host_uuid: Optional[str] = None,
ibmcloud_auth: Optional[compute.IBMCloudAuthentication] = None,
scp_auth: Optional[compute.SCPAuthentication] = None,
):
"""
:param aws_auth: authentication information for aws
Expand All @@ -64,12 +65,15 @@ def __init__(
:type host_uuid: string
:param ibmcloud_auth: authentication information for aws
:type ibmcloud_auth: compute.IBMCloudAuthentication
:param scp_auth: authentication information for scp
:type scp_auth: compute.SCPAuthentication
"""
self.aws_auth = aws_auth
self.azure_auth = azure_auth
self.gcp_auth = gcp_auth
self.host_uuid = host_uuid
self.ibmcloud_auth = ibmcloud_auth
self.scp_auth = scp_auth
self._make_cloud_providers()
self.temp_nodes: Set[compute.Server] = set() # temporary area to store nodes that should be terminated upon exit
self.pending_provisioner_tasks: List[ProvisionerTask] = []
Expand All @@ -85,8 +89,9 @@ def _make_cloud_providers(self):
self.azure = compute.AzureCloudProvider(auth=self.azure_auth)
self.gcp = compute.GCPCloudProvider(auth=self.gcp_auth)
self.ibmcloud = compute.IBMCloudProvider(auth=self.ibmcloud_auth)
self.scp = compute.SCPCloudProvider(auth=self.scp_auth)

def init_global(self, aws: bool = True, azure: bool = True, gcp: bool = True, ibmcloud: bool = True):
def init_global(self, aws: bool = True, azure: bool = True, gcp: bool = True, ibmcloud: bool = True, scp: bool = True):
"""
Initialize the global cloud providers by configuring with credentials

Expand All @@ -110,6 +115,9 @@ def init_global(self, aws: bool = True, azure: bool = True, gcp: bool = True, ib
jobs.append(self.gcp.setup_global)
if ibmcloud:
jobs.append(self.ibmcloud.setup_global)
if scp:
jobs.append(self.scp.create_ssh_key)
jobs.append(self.scp.setup_global)

do_parallel(lambda fn: fn(), jobs, spinner=False)

Expand Down Expand Up @@ -174,6 +182,10 @@ def _provision_task(self, task: ProvisionerTask):
elif task.cloud_provider == "ibmcloud":
assert self.ibmcloud.auth.enabled(), "IBM Cloud credentials not configured"
server = self.ibmcloud.provision_instance(task.region, task.vm_type, tags=task.tags)
elif task.cloud_provider == "scp":
assert self.scp.auth.enabled(), "SCP credentials not configured"
# print('def _provision_task : ', task.region, task.vm_type, task.tags)
server = self.scp.provision_instance(task.region, task.vm_type, tags=task.tags)
else:
raise NotImplementedError(f"Unknown provider {task.cloud_provider}")
logger.fs.debug(f"[Provisioner._provision_task] Provisioned {server} in {t.elapsed:.2f}s")
Expand Down Expand Up @@ -206,6 +218,8 @@ def provision(self, authorize_firewall: bool = True, max_jobs: int = 16, spinner
azure_provisioned = any([task.cloud_provider == "azure" for task in provision_tasks])
gcp_provisioned = any([task.cloud_provider == "gcp" for task in provision_tasks])
ibmcloud_provisioned = any([task.cloud_provider == "ibmcloud" for task in provision_tasks])
scp_regions = set([task.region for task in provision_tasks if task.cloud_provider == "scp"])
scp_provisioned = any([task.cloud_provider == "scp" for task in provision_tasks])

# configure regions
if aws_provisioned:
Expand All @@ -224,6 +238,25 @@ def provision(self, authorize_firewall: bool = True, max_jobs: int = 16, spinner
)
logger.fs.info(f"[Provisioner.provision] Configured IBM Cloud regions {ibmcloud_regions}")

if scp_provisioned:
logger.fs.info("SCP provisioning may sometimes take several minutes. Please be patient.")
do_parallel(
self.scp.setup_region,
list(set(scp_regions)),
spinner=spinner,
spinner_persist=False,
desc="Configuring SCP regions",
)
# server group create, add provision_tasks on tags(region)
for r in set(scp_regions):
servergroup = self.scp.network.create_server_group(r)
for task in provision_tasks:
if task.cloud_provider == "scp" and task.region == r:
task.tags["servergroup"] = servergroup
# print('provisioner.py - task.tags : ', task.tags)

logger.fs.info(f"[Provisioner.provision] Configured SCP regions {scp_regions}")

# provision VMs
logger.fs.info(f"[Provisioner.provision] Provisioning {len(provision_tasks)} VMs")
results: List[Tuple[ProvisionerTask, compute.Server]] = do_parallel(
Expand Down Expand Up @@ -253,6 +286,18 @@ def authorize_gcp_gateways():
self.gcp_firewall_rules.add(self.gcp.authorize_gateways(public_ips + private_ips))

authorize_ip_jobs.append(authorize_gcp_gateways)
if scp_provisioned:
# configure firewall for each scp region
for r in set(scp_regions):
scp_ips = [s.private_ip() for t, s in results if t.cloud_provider == "scp" and t.region == r]
# vpcids = [s.vpc_id for t, s in results if t.cloud_provider == "scp" and t.region == r] # pytype: disable=bad-return-type
vpcids = [
s.vpc_id if isinstance(s, compute.SCPServer) else None
for t, s in results
if t.cloud_provider == "scp" and t.region == r
]
# print('provisioner.py - scp_ips : ', scp_ips, ', vpcids : ', vpcids)
authorize_ip_jobs.extend([partial(self.scp.add_firewall_rule_all, r, scp_ips, vpcids)])

do_parallel(
lambda fn: fn(),
Expand Down Expand Up @@ -303,6 +348,7 @@ def deprovision_gateway_instance(server: compute.Server):
azure_deprovisioned = any([s.provider == "azure" for s in servers])
gcp_deprovisioned = any([s.provider == "gcp" for s in servers])
ibmcloud_deprovisioned = any([s.provider == "ibmcloud" for s in servers])
scp_deprovisioned = any([s.provider == "scp" for s in servers])
if azure_deprovisioned:
logger.warning("Azure deprovisioning is very slow. Please be patient.")
logger.fs.info(f"[Provisioner.deprovision] Deprovisioning {len(servers)} VMs")
Expand All @@ -327,6 +373,14 @@ def deprovision_gateway_instance(server: compute.Server):
if gcp_deprovisioned:
jobs.extend([partial(self.gcp.remove_gateway_rule, rule) for rule in self.gcp_firewall_rules])
logger.fs.info(f"[Provisioner.deprovision] Deauthorizing GCP gateways with firewalls: {self.gcp_firewall_rules}")
if scp_deprovisioned:
scp_regions = set([s.region() for s in servers if s.provider == "scp"])
for r in set(scp_regions):
scp_servers = [s for s in servers if s.provider == "scp" and s.region() == r]
scp_ips = [s.private_ip() for s in scp_servers]
vpcids = [s.vpc_id for s in scp_servers]
jobs.extend([partial(self.scp.remove_gateway_rule_region, r, scp_ips, vpcids)])
logger.fs.info(f"[Provisioner.deprovision] Deauthorizing SCP gateways with firewalls: {scp_ips}")
do_parallel(
lambda fn: fn(), jobs, n=max_jobs, spinner=spinner, spinner_persist=False, desc="Deauthorizing gateways from firewalls"
)
8 changes: 7 additions & 1 deletion skyplane/api/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from abc import ABC
from datetime import datetime
from threading import Thread
from functools import partial

import urllib3
from typing import TYPE_CHECKING, Dict, List, Optional, Set
Expand All @@ -16,6 +17,7 @@
from skyplane.utils.fn import do_parallel
from skyplane.api.usage import UsageClient
from skyplane.utils.definitions import GB
from skyplane.utils.retry import retry_backoff

from skyplane.cli.impl.common import print_stats_completed

Expand Down Expand Up @@ -335,10 +337,14 @@ def monitor_transfer(pd, self, region_tag):
def _chunk_to_job_map(self):
return {chunk_id: job_uuid for job_uuid, cr_dict in self.job_chunk_requests.items() for chunk_id in cr_dict.keys()}

def http_pool_request(self, instance):
return self.http_pool.request("GET", f"{instance.gateway_api_url}/api/v1/chunk_status_log")

def _query_chunk_status(self):
def get_chunk_status(args):
node, instance = args
reply = self.http_pool.request("GET", f"{instance.gateway_api_url}/api/v1/chunk_status_log")
# reply = self.http_pool.request("GET", f"{instance.gateway_api_url}/api/v1/chunk_status_log")
reply = retry_backoff(partial(self.http_pool_request, instance))
if reply.status != 200:
raise Exception(
f"Failed to get chunk status from gateway instance {instance.instance_name()}: {reply.data.decode('utf-8')}"
Expand Down
Loading