Source code for esm_archiving.external.pypftp

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import print_function

# -*- coding: utf-8 -*-

# ppftp.py

"""
python interface to HPSS's pftp

"""

import os
import re
import sys
import time
import netrc
import ftplib
import getpass
import fnmatch
import posixpath
import subprocess as sp

from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor


HOST = "tape.dkrz.de"
PORT = 4021


__all__ = ["upload", "download", "Pftp"]


def connect(username=None, password=None):

    ftp_obj = ftplib.FTP()
    ftp_obj.connect(HOST, PORT)

    username = username or getpass.getuser()
    if password is None:
        try:
            n = netrc.netrc()
            username, account, password = n.authenticators(HOST)
        except netrc.NetrcParseError as e:
            print("Could not parse .netrc file: %s", e)
            exit(1)
        except IOError as e:
            print("I/O error: {0}".format(e))
            exit(1)

    ftp_obj.login(username, password)
    return ftp_obj


_msg_re = re.compile("^[0-9]+-?\s?[A-Za-z].*").match


def filter_msg(lines):
    try:
        lines = lines.splitlines()
    except AttributeError:
        pass
    for line in lines:
        if line and not _msg_re(line):
            yield line


def msgs(lines):
    try:
        lines = lines.splitlines()
    except AttributeError:
        pass
    for line in lines:
        if _msg_re(line):
            yield line


[docs]class Pftp(object): HOST = HOST PORT = PORT def __init__(self, username=None, password=None): self.username = username = username or getpass.getuser() self.ftp_obj = connect(username, password) self._auth = {"username": username, "password": password} def __repr__(self): is_active = self.is_connected() status = "alive" if is_active else "stale" s = ( "<{0.__class__.__name__} connected to " "{0.HOST}:{0.PORT} " "as {0.username}>" ) s = s.format(self) s += "; connection={}".format(status) return repr(s)
[docs] def is_connected(self): "check if the connection is still active" try: self.ftp_obj.voidcmd("NOOP") except: return False return True
[docs] def reconnect(self): "reconnects to the ftp server" try: self.ftp_obj.close() except: pass self.ftp_obj = connect(self._auth["username"], self._auth["password"])
[docs] def quit(self): self.ftp_obj.quit()
[docs] def close(self): self.quit() self.ftp_obj.close()
[docs] def pwd(self): "present working directory" return self.ftp_obj.pwd()
[docs] def cwd(self, path): "change working directory" try: if path.startswith(self.pwd()): path = path.replace(self.pwd(), "").lstrip("/") self.ftp_obj.cwd(path) except: raise ValueError("Path does not exists: " + path)
[docs] def exists(self, path): "check if a path exists" try: result = self.stat(path) except: result = None return True if result else False
[docs] def stat(self, pathname): "Returns stat of the path" try: _stat = self.ftp_obj.sendcmd("STAT {}".format(pathname)) except ftplib.error_perm: return None return list(filter_msg(_stat))
[docs] def size(self, pathname): "Returns size of path in bytes" return int(self.stat(pathname)[0].split()[6])
[docs] def isfile(self, pathname): "Returns true if pathname refers to an existing file" _stat = self.stat(pathname) if _stat is None: raise ValueError("path does not exists %s" % pathname) line = _stat.pop(0) if line[0] == "-": return True if line[0] == "l": pwd = self.pwd() name = line.split()[-1] _flag = False try: self.cwd(name) except: _flag = True finally: self.cwd(pwd) return _flag return False
[docs] def isdir(self, pathname): "Returns true if pathname refers to an existing directory" _stat = self.stat(pathname) if _stat is None: raise ValueError("path does not exists %s" % pathname) line = _stat.pop(0) if line[0] == "d": return True if line[0] == "l": pwd = self.pwd() name = line.split()[-1] try: self.cwd(name) _flag = True except ftplib.error_perm: _flag = False finally: self.cwd(pwd) return _flag return False
[docs] def listing(self, path=None): "list directory contents" path = path or self.pwd() return self.ftp_obj.nlst(path)
listdir = listing
[docs] def listing2(self, path=None): 'directory listing in long form. similar to "ls -l"' path = path or self.pwd() tmp = [] self.ftp_obj.dir(path, tmp.append) return tmp
[docs] def mlsd(self, path): cmd = "MLSD %s" % path lines = [] self.ftp_obj.retrlines(cmd, lines.append) CLRF = self.ftp_obj for line in lines: facts, _, name = line.rstrip().partition(" ") entry = {} for fact in facts[:-1].split(";"): key, _, value = fact.partition("=") entry[key.lower()] = value yield (name, entry)
[docs] def files(self, path=None): "gather files at the given path" path = path or self.pwd() result = [] for entry in self.listing2(path): name = entry.split()[-1] if entry.startswith("-"): result.append(name) if entry.startswith("l"): if self.isfile(name): result.append(name) return result
[docs] def directories(self, path=None): "gather directories at the given path" path = path or self.pwd() result = [] for entry in self.listing2(path): name = entry.split()[-1] if entry.startswith("d"): result.append(name) if entry.startswith("l"): if self.isdir(name): result.append(name) return result
[docs] def walk(self, path=None): "recursively walk the directory tree from the given path. Similar to os.walk" path = path or self.pwd() _files = self.files(path) _subdirs = self.directories(path) yield (path, _subdirs, _files) for subdir in _subdirs: for x in self.walk(subdir): yield x
[docs] def walk_for_files(self, path=None): "recursively gather files" for root, _dirs, _files in self.walk(path): for f in _files: yield f
[docs] def walk_for_directories(self, path=None): "recursively gather directories" for root, _dirs, _files in self.walk(path): for d in _dirs: yield d
def __enter__(self): return self def __exit__(self, *args): self.close() # ========== MANIPULATIONS ==========
[docs] def mkdir(self, path): if not self.exists(path): print("Created directory: " + path) self.ftp_obj.mkd(path)
[docs] def makedirs(self, path): """Recursively create dirs as required walking up to an existing parent dir""" try: _isdir = self.isdir(path) except ValueError: pass else: if _isdir: return dirname = posixpath.dirname(path) if dirname: self.makedirs(dirname) self.mkdir(path) return
[docs] def rmdir(self, path): "remove directory" if self.exists(path): self.ftp_obj.rmd(path) print("Removed directory: " + path) return
[docs] def remove(self, filename): if self.exists(filename): self.ftp_obj.delete(filename) print("Deleted file: " + filename) return
[docs] def removedirs(self, path): remove = self.remove rmdir = self.rmdir path = posixpath.normpath(path) dirs = set() for topdir, subdirs, files in self.walk(path): for _file in files: remove(_file) if subdirs: dirs.update(subdirs) dirs.add(topdir) dirs = sorted(dirs, key=lambda x: len(x.split(posixpath.sep)), reverse=True) for d in dirs: rmdir(d) return None
[docs] def rename(self, from_name, to_name): if os.path.dirname(to_name) and ( os.path.dirname(from_name) != os.path.dirname(to_name) ): raise ValueError("Must rename a file within the same directory") self.ftp_obj.rename(from_name, to_name) return to_name
# ========== FILE-TRANSFERS ==========
[docs] @staticmethod def upload(source, destination): "uses pftp binary for transfering the file" return upload(source, destination)
[docs] @staticmethod def download(source, destination): "uses pftp binary for transfering the file" return download(source, destination)
def printProgressBar( iteration, total, prefix="", suffix="", decimals=1, length=80, fill=u"+" ): "https://stackoverflow.com/questions/3173320/text-progress-bar-in-the-console" percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) filledLength = int(length * iteration // total) bar = fill * filledLength + "-" * (length - filledLength) print("\r%s |%s| %s%% %s" % (prefix, bar, percent, suffix), end="\r") # Print New Line on Complete # if iteration == total: # print() return def _transfer(cmdlines): client = sp.Popen("pftp", stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE) o, e = client.communicate(cmdlines.encode("utf-8")) # for line in o: # print(line) return def _file_stripe_count(filename): cmd = ["lfs", "getstripe", "-c", filename] return int(sp.check_output(cmd)) def _pftp_upload_commands(source, destination): source_filename = os.path.basename(source) source_dirname = os.path.dirname(source) or os.curdir cmd = ["prompt"] stripe_count = _file_stripe_count(source) if stripe_count > 1: if stripe_count > 8: stripe_count = 8 # limit to 8 parallel tasks cmd.append("setpwidth {}".format(stripe_count)) cmd.append("setpblocksize {}".format(1024 * 1024)) # 1MB if source_dirname != os.curdir: cmd.append("lcd {}".format(source_dirname)) cmd.append("cd {}".format(destination)) cmd.append("put {}".format(source_filename)) cmd.append("bye") cmd = "\n".join(cmd) return cmd
[docs]def upload(source, destination): source = source.rstrip("/") destination = destination.rstrip("/") source_filename = os.path.basename(source) destination_maybe_filename = os.path.basename(destination) destination_dirname = os.path.dirname(destination) p = Pftp() requires_renaming = False if not p.exists(destination): if source_filename == destination_maybe_filename: destination = destination_dirname elif os.path.splitext(destination_maybe_filename)[-1]: destination = destination_dirname requires_renaming = True destination_filepath = os.path.join(destination, source_filename) if not p.exists(destination): p.makedirs(destination) source_filesize = os.stat(source).st_size def get_filesize(filepath): if not p.is_connected(): p.reconnect() if not p.exists(filepath): return 0 return p.size(filepath) cmdlines = _pftp_upload_commands(source, destination) pool = ProcessPoolExecutor(1) fut = pool.submit(_transfer, cmdlines) # print('Uploading file: {}'.format(source_filename)) while fut.running(): time.sleep(1) current_size = get_filesize(destination_filepath) # if not current_size: # print('\rWaiting for tape to respond...', end='\r') # else: # printProgressBar(current_size, source_filesize, source_filename) printProgressBar(current_size, source_filesize, source_filename) fut.result() if requires_renaming: p.cwd(destination_dirname) p.rename(source_filename, destination_maybe_filename) try: p.close() except: pass if requires_renaming: return os.path.join(destination, destination_maybe_filename) return os.path.join(destination, source_filename)
def _pftp_download_commands(source, destination): source_filename = os.path.basename(source) source_dirname = os.path.dirname(source) or os.curdir cmd = ["prompt"] cmd.append("cd {}".format(source_dirname)) if source_dirname != os.curdir: cmd.append("lcd {}".format(destination)) cmd.append("get {}".format(source_filename)) cmd.append("bye") cmd = "\n".join(cmd) return cmd
[docs]def download(source, destination): source = source.rstrip("/") destination = destination.rstrip("/") source_filename = os.path.basename(source) destination_maybe_filename = os.path.basename(destination) destination_dirname = os.path.dirname(destination) requires_renaming = False if not os.path.exists(destination): if source_filename == destination_maybe_filename: destination = destination_dirname elif os.path.splitext(destination_maybe_filename)[-1]: destination = destination_dirname requires_renaming = True destination_filepath = os.path.join(destination, source_filename) if not os.path.exists(destination): try: os.makedirs(destination) except: pass with Pftp() as p: source_filesize = p.size(source) def get_filesize(filepath): if not os.path.exists(filepath): return 0 return os.path.getsize(filepath) cmdlines = _pftp_download_commands(source, destination) pool = ProcessPoolExecutor(1) fut = pool.submit(_transfer, cmdlines) # print('Downloading file: {}'.format(source_filename)) while fut.running(): time.sleep(1) current_size = get_filesize(destination_filepath) printProgressBar( current_size, source_filesize, os.path.basename(destination_filepath) ) fut.result() if requires_renaming: curdir = os.path.abspath(os.curdir) os.chdir(destination_dirname) os.rename(source_filename, destination_maybe_filename) os.chdir(curdir) if requires_renaming: return os.path.join(destination, destination_maybe_filename) return os.path.join(destination, source_filename)
def command_line_interface(): import click wildcard_check = re.compile("[*?[]") def has_wildcard(path): return wildcard_check.search(path) is not None class WildcardInDirectoryPath(click.ParamType): name = "PathWildcard" def convert(self, value, param, ctx): try: dirname = os.path.dirname(value) if has_wildcard(dirname): raise ValueError return value except ValueError: msg = "Wildcard only in file names are allowed {}".format(value) self.fail(msg, param, ctx) WILDCARD_PATH = WildcardInDirectoryPath() def sizeof_fmt(num, suffix="B"): for unit in ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"): if abs(num) < 1024.0: return "%3.1f%s%s" % (num, unit, suffix) num /= 1024.0 return "%.1f%s%s" % (num, "Yi", suffix) @click.group() def main(): """A command line interface for HPSS (tape archive).""" @main.command("download", short_help="Download files from tape") @click.argument("src", nargs=-1, type=WILDCARD_PATH) @click.argument("dst", nargs=1, type=click.Path()) def _download(src, dst): """Download files from tape. \b SRC: file on tape DST: path on mistral """ p = Pftp() files = [] for entry in src: basename = os.path.basename(entry) if has_wildcard(basename): dirname = os.path.dirname(entry) contents = p.listdir(dirname) for fullname in contents: if fnmatch.fnmatch(os.path.basename(fullname), basename): files.append(fullname) else: if p.exists(entry): files.append(entry) total_size = sum(map(p.size, files)) nfiles = len(files) p.close() click.echo( "{} files to download. Total size: {}".format( nfiles, sizeof_fmt(total_size) ) ) width = str(len(str(nfiles))) msg = "[{:>" + width + "}/{:>" + width + "}]" for index, fname in enumerate(files, 1): downloaded_file = download(fname, dst) count = msg.format(index, nfiles) click.echo(count + " Downloaded: " + downloaded_file + " ") @main.command("upload", short_help="Upload files to tape") @click.argument("src", nargs=-1, type=click.Path()) @click.argument("dst", nargs=1, type=click.Path()) def _upload(src, dst): """Upload files to tape. \b SRC: file on mistral DST: path on tape """ nfiles = len(src) total_size = sum(map(os.path.getsize, src)) width = str(len(str(nfiles))) msg = "[{:>" + width + "}/{:>" + width + "}]" click.echo( "{} files to upload. Total size: {}".format(nfiles, sizeof_fmt(total_size)) ) for index, fname in enumerate(src, 1): uploaded_file = upload(fname, dst) count = msg.format(index, nfiles) click.echo(count + " Uploaded: " + uploaded_file + " ") ''' @main.command() @click.option('-l', default=False, is_flag=True, help="long listing") @click.argument('path', type=WILDCARD_PATH) def ls(path, l): """List directory contents on tape. \b PATH: path on tape """ p = Pftp() if has_wildcard(path): listings = p.listdir(os.path.dirname(path)) files = fnmatch.filter(listings, path) if l: listings2 = p.listing2(os.path.dirname(path)) files = [long_path for short_path, long_path in zip(listings, listings2) if short_path in files] for fname in files: click.echo(fname) else: if not p.exists(path): raise ValueError('Path does not exists') # click.echo('Path not found...') if l: contents = p.listing2(path) else: contents = p.listdir(path) for line in contents: click.echo(line) p.close() return ''' @main.command() @click.option("-l", default=False, is_flag=True, help="long listing") @click.option("-r", default=False, is_flag=True, help="reverse listing") @click.option("-t", default=False, is_flag=True, help="sort by mtime") @click.option("-h", default=False, is_flag=True, help="human readable size") @click.argument("path", type=click.Path()) def ls(path, l, r, t, h): """List directory contents on tape. \b PATH: path on tape """ def sanitize_path(path): return path.split(None, 8)[-1].split(None)[0] p = Pftp() result = p.listing2(path) aux = {} names = dict(zip(list(map(sanitize_path, result)), result)) paths = defaultdict(list) for name in names: paths[os.path.dirname(name)].append(name) for dname, _files in paths.items(): mlist = dict(list(p.mlsd(dname))) for ff in _files: entry = mlist[ff] aux[names[ff]] = int(entry.get("size", 0)), entry.get("modify", "") if t: result = sorted(result, key=lambda row: aux[row][-1]) if not r: result = list(reversed(result)) if h: tmp = [] for line in result: size = aux[line][0] parts = line.split(None, 8) parts[4] = sizeof_fmt(size) tmp.append(" ".join(parts)) result = tmp[:] if not l: result = [row.split(None, 8)[-1] for row in result] for row in result: click.echo(row) p.close() return @main.command() @click.argument("path") def exists(path): """check if PATH exists on tape. \b PATH: path on tape """ with Pftp() as p: if not p.exists(path): click.echo("False") else: click.echo("True") @main.command() @click.option("-h", is_flag=True, help="human readable format") @click.argument("path") def size(path, h): """Prints file size in bytes. Use -h option for human readable format. \b PATH: path on tape """ with Pftp() as p: if not p.exists(path): click.echo("File not found") else: fsize = p.size(path) if h: fsize = sizeof_fmt(fsize) click.echo("{}".format(fsize)) @main.command() @click.argument("path") def mkdir(path): """creates PATH on tape. \b PATH: path on tape """ with Pftp() as p: if p.exists(path): click.echo("Already exists") else: p.makedirs(path) @main.command() @click.argument("path") def isdir(path): """checks if PATH is a directory. \b PATH: path on tape """ with Pftp() as p: if not p.exists(path): click.echo("path not found") elif p.isdir(path): click.echo("True") else: click.echo("False") @main.command() @click.argument("path") def isfile(path): """checks if PATH is a file. \b PATH: path on tape """ res = 1 with Pftp() as p: if not p.exists(path): click.echo("path not found") elif p.isfile(path): click.echo("True") res = 0 else: click.echo("False") sys.exit(res) @main.command() @click.argument("path") def islink(path): """checks if PATH is a link. \b PATH: path on tape """ with Pftp() as p: if not p.exists(path): click.echo("path not found") elif p.islink(path): click.echo("True") else: click.echo("False") def pyversion_check(): import os supported_versions = ("python/3.5.2", "python/2.7.12") modules = os.environ.get("LOADEDMODULES", "") for version in supported_versions: if version in modules: break else: pyversion = "".join([v for v in modules.split(":") if "python" in v]) msg = """\nLOADED VERSION: {} SUPPORTED VERSIONS: {} USE "module load" COMMAND TO LOAD A SUPPORTED PYTHON VERSION. """.format( pyversion, " or ".join(supported_versions) ) import warnings warnings.warn(msg, RuntimeWarning, stacklevel=2) # pyversion_check() main() if __name__ == "__main__": command_line_interface()