开发手册 欢迎您!
软件开发者资料库

Python 调用执行Linux中shell命令或程序的通用工具函数方法

本文主要介绍Python中,可以直接使用的执行Linux中shell命令,或者程序的通用工具函数方法。

1、通用代码

def get_error_info():    import traceback    errorMsg = traceback.format_exc()    return errorMsgdef get_preexec_fn(run_user):    '''        @name 获取指定执行用户预处理函数        @param run_user 运行用户        @return 预处理函数    '''    import pwd    pid = pwd.getpwnam(run_user)    uid = pid.pw_uid    gid = pid.pw_gid    def _exec_rn():        os.setgid(gid)        os.setuid(uid)    return _exec_rndef ExecShell(cmdstring, timeout=None, shell=True,cwd=None,env=None,user = None):    '''        @name 执行命令        @param cmdstring 命令        @param timeout 超时时间        @param shell 是否通过shell运行        @param cwd 进入的目录        @param env 环境变量        @param user 执行用户名        @return 命令执行结果    '''    a = ''    e = ''    import subprocess,tempfile    preexec_fn = None    tmp_dir = '/dev/shm'    if user:        preexec_fn = get_preexec_fn(user)        tmp_dir = '/tmp'    try:        rx = md5(cmdstring)        succ_f = tempfile.SpooledTemporaryFile(max_size=4096,mode='wb+',suffix='_succ',prefix='btex_' + rx ,dir=tmp_dir)        err_f = tempfile.SpooledTemporaryFile(max_size=4096,mode='wb+',suffix='_err',prefix='btex_' + rx ,dir=tmp_dir)        sub = subprocess.Popen(cmdstring, close_fds=True, shell=shell,bufsize=128,stdout=succ_f,stderr=err_f,cwd=cwd,env=env,preexec_fn=preexec_fn)        if timeout:            s = 0            d = 0.01            while sub.poll() is None:                time.sleep(d)                s += d                if s >= timeout:                    if not err_f.closed: err_f.close()                    if not succ_f.closed: succ_f.close()                    return 'Timed out'        else:            sub.wait()        err_f.seek(0)        succ_f.seek(0)        a = succ_f.read()        e = err_f.read()        if not err_f.closed: err_f.close()        if not succ_f.closed: succ_f.close()    except:        return '',get_error_info()    try:        #编码修正        if type(a) == bytes: a = a.decode('utf-8')        if type(e) == bytes: e = e.decode('utf-8')    except:pass    return

2、使用示例

ExecShell("wget -O {} {} --no-check-certificate".format(filename,url))