Files
bale/snapper/interfaces/ssh.py
Natan Keddem 9a2c2c2273 initial
2023-11-04 00:00:00 -04:00

128 lines
4.9 KiB
Python

from typing import Dict, Union
import os
import asyncio
from pathlib import Path
from snapper.result import Result
from snapper.interfaces.cli import Cli
def get_hosts(path):
path = f"{Path(path).resolve()}/config"
hosts = []
try:
with open(path, "r", encoding="utf-8") as f:
lines = f.readlines()
for line in lines:
if line.startswith("Host "):
hosts.append(line.split(" ")[1].strip())
return hosts
except FileNotFoundError:
return []
async def get_public_key(path: str) -> str:
path = Path(path).resolve()
if "id_rsa.pub" not in os.listdir(path) or "id_rsa" not in os.listdir(path):
await Cli().shell(f"""ssh-keygen -t rsa -N "" -f {path}/id_rsa""")
with open(f"{path}/id_rsa.pub", "r", encoding="utf-8") as reader:
return reader.read()
class Ssh:
def __init__(
self, path: str, host: str, hostname: str = "", username: str = "", password: Union[str, None] = None, seperator: bytes = b"\n"
) -> None:
self._raw_path: str = path
self._path: Path = Path(path).resolve()
self.host: str = host
self.password: Union[str, None] = password
self.use_key: bool = False
if password is None:
self.use_key = True
self._key_path: str = f"{self._path}/id_rsa"
self._base_cmd: str = ""
self._full_cmd: str = ""
self._cli = Cli(seperator=seperator)
self._config_path: str = f"{self._path}/config"
self._config: Dict[str, Dict[str, str]] = {}
self.read_config()
self.hostname: str = hostname or self._config.get(host, {}).get("HostName", "")
self.username: str = username or self._config.get(host, {}).get("User", "")
self.set_config()
def read_config(self) -> None:
try:
with open(self._config_path, "r", encoding="utf-8") as f:
lines = f.readlines()
for line in lines:
line = line.strip()
if line == "" or line.startswith("#"):
continue
if line.startswith("Host "):
current_host = line.split(" ")[1].strip()
self._config[current_host] = {}
else:
key, value = line.split(" ", 1)
self._config[current_host][key.strip()] = value.strip()
except FileNotFoundError:
self._config = {}
def write_config(self) -> None:
with open(self._config_path, "w", encoding="utf-8") as f:
for host, config in self._config.items():
f.write(f"Host {host}\n")
for key, value in config.items():
f.write(f" {key} {value}\n")
f.write("\n")
def set_config(self) -> None:
self._config[self.host] = {
"IdentityFile": self._key_path,
"PasswordAuthentication": "no",
"StrictHostKeychecking": "no",
"IdentitiesOnly": "yes",
}
if self.hostname != "":
self._config[self.host]["HostName"] = self.hostname
if self.username != "":
self._config[self.host]["User"] = self.username
self.write_config()
def remove(self) -> None:
del self._config[self.host]
self.write_config()
async def execute(self, command: str) -> Result:
# self._base_cmd = (
# f"{'' if self.use_key else f'sshpass -p {self.password} '}"
# f"ssh -o IdentitiesOnly=yes"
# f" -F {self._config_path}"
# f" -o StrictHostKeychecking=no"
# f" {self.host}"
# )
self._base_cmd = f"{'' if self.use_key else f'sshpass -p {self.password} '} ssh -F {self._config_path} {self.host}"
self._full_cmd = f"{self._base_cmd} {command}"
# proc = await asyncio.create_subprocess_shell(
# self._full_cmd, shell=True, stderr=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
# )
# stdout, stderr = await proc.communicate()
# result = self._cli.execute(self._full_cmd)
# return Result(command=self._full_cmd, stdout=stdout.decode(), stderr=stderr.decode())
return await self._cli.execute(self._full_cmd)
async def send_key(self) -> Result:
await get_public_key(self._raw_path)
cmd = (
f"sshpass -p {self.password} "
f"ssh-copy-id -o IdentitiesOnly=yes -i {self._key_path} "
f"-o StrictHostKeychecking=no {self.username}@{self.hostname}"
)
# proc = await asyncio.create_subprocess_shell(cmd, shell=True, stderr=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
# stdout, stderr = await proc.communicate()
# return Result(command=cmd, stdout=stdout.decode(), stderr=stderr.decode())
return await self._cli.execute(cmd)
@property
def config_path(self):
return self._config_path