-
Notifications
You must be signed in to change notification settings - Fork 195
Home
Guannan Ma edited this page Mar 1, 2019
·
8 revisions
Welcome to the CUP wiki!
You can also visit http://cup.iobusy.com for more details.
目前基础库包含如下大的Package[每类包含若干小的package及module].
排序按照常用->较常用->专用
from cup import mail
mailer = mail.SmtpMailer(
'xxx@xxx.com', # sender
'xxxx.smtp.xxx.com', # smtp server
is_html=True # use html or not for email contents
)
mailer.sendmail(
[
'abc@baidu.com',
'xxx@baidu.com',
'yyy@baidu.com'
],
'test_img',
(
'testset <img src="cid:screenshot.png"></img>'
),
# attachment
[
'/home/work/screenshot.png',
'../abc.zip'
]
)
去除复杂log设置,即开即用
import logging
from cup import log
log.init_comlog(
'test',
log.DEBUG,
'/home/work/test/test.log',
log.ROTATION,
1024,
False
)
log.info('test xxx')
log.error('test critical')
# 支持重新设置日志level、日志文件位置、参数等,具体参数设置见文档
# Level: Datetime * [pid:threadid] [source.code:lineno] content
INFO: 2017-02-06 19:35:54,242 * [16561:140543940048640] [log.py:278] --------------------Log Initialized Successfully--------------------
INFO: 2017-02-06 19:35:54,242 * [16561:140543940048640] [cup_log_test.py:36] info
DEBUG: 2017-02-06 19:35:54,243 * [16561:140543940048640] [cup_log_test.py:40] debug
举例, 获得当前机器cpu/mem/net统计信息。 进程资源信息(所有/proc/pid能拿到的资源信息)
# 1. 获取系统信息, 可以获取cpu mem network_speed等等
import cup
# 计算60内cpu的使用情况。
from cup.res import linux
cpuinfo = linux.get_cpu_usage(intvl_in_sec=60)
print cpuinfo.usr
# get_meminfo函数取得系统Mem信息的回返信息。 是个namedtuple
# total, available, percent, used, free, active, inactive, buffers, cached是她的属性
from cup.res import linux
meminfo = linux.get_meminfo()
print meminfo.total
print meminfo.available
# 获取系统其他信息, 如启动时间、cpu情况、网卡情况等
cup.res.linux.get_boottime_since_epoch()# Returns: 回返自从epoch以来的时间。 以秒记.
cup.res.linux.get_cpu_nums()[source] # 回返机器的CPU个数信息
cup.res.linux.get_disk_usage_all() # 拿到/目录的使用信息, 回返是一个字典
cup.res.linux.get_disk_info() # 拿到Linux系统的所有磁盘信息
cup.res.linux.get_cpu_usage(intvl_in_sec=1) # 取得下个intvl_in_sec时间内的CPU使用率信息 回返CPUInfo
cup.res.linux.get_kernel_version() # 拿到Linux系统的kernel信息, 回返是一个三元组 e.g.(‘2’, ‘6’, ‘32’):
cup.res.linux.get_meminfo() # 获得当前系统的内存信息, 回返MemInfo数据结构。
cup.res.linux.get_net_recv_speed(str_interface, intvl_in_sec) # 获得某个网卡在intvl_in_sec时间内的收包速度
cup.res.linux.get_net_through(str_interface) # 获取网卡的收发包的统计信息。 回返结果为(rx_bytes, tx_bytes)
cup.res.linux.get_net_transmit_speed(str_interface, intvl_in_sec=1) # 获得网卡在intvl_in_sec时间内的网络发包速度
print cup.res.linux.get_net_transmit_speed('eth1', 5)
cup.res.linux.get_swapinfo() # 获得当前系统的swap信息
cup.res.linux.net_io_counters() # net io encounters
# 2. 获取进程相关信息, 移植自psutil,可以获取/proc/xxxpid进程内的所有信息
# 也可以获取该进程下所有子进程相关的信息, 详见代码
get_connections(*args, **kwargs)
get_cpu_times(*args, **kwargs)
get_ext_memory_info(*args, **kwargs)
get_memory_info(*args, **kwargs)
get_memory_maps()
get_num_ctx_switches(*args, **kwargs)[source]
get_num_fds(*args, **kwargs)[source]
get_open_files(*args, **kwargs)[source]
get_process_cmdline(*args, **kwargs)[source]
get_process_create_time(*args, **kwargs)[source]
get_process_cwd(*args, **kwargs)[source]
get_process_exe()[source]
获得进程的Exe信息。 如果这个进程是个daemon,请使用get_process_name
get_process_gids(*args, **kwargs)[source]
get_process_io_counters(*args, **kwargs)[source]
get_process_name(*args, **kwargs)[source]
get_process_nice(*args, **kwargs)[source]
get_process_num_threads(*args, **kwargs)[source]
get_process_ppid(*args, **kwargs)[source]
get_process_status(*args, **kwargs)[source]
get_process_threads(*args, **kwargs)[source]
get_process_uids(*args, **kwargs)[source]
nt_mmap_ext alias of mmap
nt_mmap_grouped alias of mmap
举例,支持超时kill的shell命令执行
# 特色示例
# 1. 拥有超时控制的shell执行. 超时shell命令会被kill (SIGTERM)
import cup
shelltool = cup.shell.ShellExec()
print shelltool.run('/bin/ls', timeout=1)
# 2. 其他类,比如使用rm -rf删除某路径,获取文件MD5等
cup.shell.md5file(filename) # 计算一个文件的md5值。返回32位长的hex字符串。
cup.shell.kill9_byname(strname) # kill -9 process by name
cup.shell.del_if_exist(path) # 如果文件/目录/symlink存在则删除他们
# 3. 进行远程shell操作 (内部调用了pexpect)
# 具体参照 http://docs.iobusy.com/docs/cup/cup.shell.html#module-cup.shell.expect
# 其中exit_status为本地ssh命令退出码, remote_status为远程的退出码
各类util类pacakge。举例,线程池,conf(对应c++ Configure及hdfs xxx.xml),生成器、自动等待触发器
- http://docs.iobusy.com/docs/cup/cup.util.html#module-cup.util.autowait 阻塞直到超时或者某事件发生
- http://docs.iobusy.com/docs/cup/cup.util.html#module-cup.util.threadpool 线程池文档
- http://docs.iobusy.com/docs/cup/cup.util.html#module-cup.util.thread 可打断线程
#1 支持类型丰富、带内嵌套、数组的文档 http://docs.iobusy.com/docs/cup/cup.util.html#cup.util.conf.Configure2Dict
# test.conf
# 第一层,Global layer, key:value
host: abc.com
port: 12345
# 1st layer [monitor]
@disk: sda
@disk: sdb
# 第二层配置 conf_dict['section']
[section]
# 支持数组 conf_dict['section']['disk'] =>是个一维数组
@disk: sda
@disk: sdb
# 支持层级数组 conf_dict['section']['ioutil'] => 是个一维数组
[@ioutil]
util: 80 # accessed by conf_dict['section']['ioutil'][0]['util']
[@ioutil]
util: 60
[monitor]
timeout: 100
regex: sshd
# 2nd layer that belongs to [monitor]
# 第三层配置 conf_dict['monitor']['timeout']
[.timeout]
# key:value in timeout
max: 100
# 3rd layer that belongs to [monitor] [timeout]
[..handler]
default: exit
# 2. 使用线程池scp分发文件
class CRemote(object):
"""
handle remote things
"""
def __init__(self, username, password, retry=0):
self._username = username
self._passwd = password
self._wget_retry = retry
self._thdpool = threadpool.ThreadPool(minthreads=2, maxthreads=4)
self._generator = generator.CGeneratorMan()
# pylint: disable=R0913
def _thread_scp_dir_by_tarfile(self,
hostname, dst, tarpath, ret_list, timeout
):
"""
do sth in a thread。 执行该scp命令的实际线程. 执行scp
"""
if os.path.normpath(dst) == '/':
raise IOError('You CANNOT delete root /')
tarfile_name = os.path.basename(tarpath)
log.info('to mkdir in case it does not exists:{0}'.format(dst))
cmd = 'mkdir -p {0};'.format(dst, dst)
ret = self.remote(hostname, cmd, 10)
ret['hostname'] = hostname
if ret['exitstatus'] != 0 or ret['remote_exitstatus'] != 0:
log.critical('mkdir dst failed, error info:{0}'.format(ret))
ret_list.append(ret)
return
log.info('scp_dir_by_tarfile, src tarfile:{0} dst:{1}'.format(
tarpath, dst)
)
ret = self.lscp(tarpath, hostname, dst)
ret['hostname'] = hostname
if ret['exitstatus'] != 0 or ret['remote_exitstatus'] != 0:
log.critical('lscp failed, error info:{0}'.format(ret))
ret_list.append(ret)
else:
cmd = 'cd {0}; tar zxf {1}; rm -f {2}'.format(
dst, tarfile_name, tarfile_name
)
ret = self.remote(hostname, cmd, timeout=timeout)
ret['hostname'] = hostname
ret_list.append(ret)
log.info('succeed to scp dir, hostname:{0}'.format(hostname))
return
def scp_hosts_dir_by_tarfile(self,
src, hosts, dst, tempfolder, tarfile_name=None, timeout=600
):
"""
scp a folder to multiple hosts
使用线程池,多线程并发的scp拷贝本地目录到远程机器目录
:hosts:
hosts that will be scped
:tempfolder:
temp folder which will store tar file temporarily.
"""
src = os.path.normpath(src)
if tarfile_name is None:
tarfile_name = '{0}.{1}.tar.gz'.format(
net.get_local_hostname(), time.time()
)
src_folder_name = os.path.basename(src)
name = '{0}/{1}'.format(tempfolder, tarfile_name)
log.info('to compress folder into tarfile:{0}'.format(name))
with tarfile.open(tempfolder + '/' + tarfile_name, 'w:gz') as tar:
tar.add(src, src_folder_name)
ret_list = []
# 启动线程池
self._thdpool.start()
for hostname in hosts:
# 像线程池扔作业
self._thdpool.add_1job(
self._thread_scp_dir_by_tarfile,
hostname,
dst,
name,
ret_list,
timeout
)
time.sleep(2)
# 完成后,等待线程池关闭
self._thdpool.stop() # 注意,线程池也提供try_stop接口,具体可以看文档
# if not ret:
# errmsg = 'failed to stop threadpool while dispath files'
# content = errmsg
# petaqa.peta.lib.common.send_mail(errmsg, content)
# log.error(errmsg)
# log.error(content)
os.unlink(name)
return ret_list
# 3. 生成唯一码、唯一数等
# http://docs.iobusy.com/docs/cup/cup.util.html#module-cup.util.generator
两个重要部分:
- a. 网卡信息获取、路由信息获取、socket参数设置
- b. 异步消息通信协议支持
# 1. 获取路由信息
from cup.net import route
ri = route.RouteInfo()
print json.dumps(ri.get_route_by_ip('10.32.19.92'), indent=1)
print json.dumps(ri.get_routes(), indent=1)
{
"Use": "0",
"Iface": "eth1",
"Metric": "0",
"Destination": "10.0.0.0",
"Mask": "255.0.0.0",
"RefCnt": "0",
"MTU": "0",
"Window": "0",
"Gateway": "10.226.71.1",
"Flags": "0003",
"IRTT": "0"
}
[
{
"Use": "0",
"Iface": "eth1",
"Metric": "0",
"Destination": "10.226.71.0",
"Mask": "255.255.255.0",
"RefCnt": "0",
"MTU": "0",
"Window": "0",
"Gateway": "0.0.0.0",
"Flags": "0001",
"IRTT": "0"
},
# 2. 获取ip && 设置socket参数等
from cup import net
# set up socket params
net.set_sock_keepalive_linux(sock, 1, 3, 3)
net.set_sock_linger(sock)
net.set_sock_quickack(sock)
net.set_sock_reusable(sock, True) # port resuable
# get ipaddr of a network adapter/interface
print net.getip_byinterface('eth0')
print net.getip_byinterface('eth1')
print net.getip_byinterface('xgbe0') # 万兆网卡
# get ipaddr of a hostname
print net.get_hostip('abc.test.com')
http://docs.iobusy.com/docs/cup/cup.net.async.html#module-cup.net.async.msgcenter
http://blog.iobusy.com/?p=188 异步消息库属于专用且较为复杂的通信库,为高吞吐、高效网络通信场景使用,请注意你的使用场景是否匹配
举例,一键变类为Singleton类。
import cup
import cup.jenkinslib
############### quick start ###############
jenkins = cup.jenkinslib.Jenkins('cup.jenkins.baidu.com')
job = jenkins['cup_quick']
print job.name, job.last_stable_build_number, job.description
print job[5], job["lastSuccessBuild"], job.last_stable_build
qi = job.invoke()
build = qi.block_until_building()
print build.name, build.number, build.timestamp
try:
build.block_until_complete(timeout=20)
except cup.jenkinslib.RunTimeout as err:
print "timeout:", err
build.stop()
print build.duration, build.result, build.description
build.description = "new description"
jenkins.enable_ftp('ftp.baidu.com', 'cup', 'password', 22)
with build.ftp_artifacts as af:
af['artifacts_path'].download('./local_path')
各类延时、queue执行器、内存池、心跳service等
http://docs.iobusy.com/docs/cup/cup.services.html
# 1. Buffer内存缓冲以及Bufferpool内存缓冲池
# http://docs.iobusy.com/docs/cup/cup.services.html#module-cup.services.buffers
# 由于Python内存管理的特点,在比如网络通信以及频繁(稍大)内存的申请释放时候会退化为malloc/free。 具体可见两篇比较好的解释文章:
# (a). stackflow的关于什么时候可以用bytearray
# http://stackoverflow.com/questions/9099145/where-are-python-bytearrays-used
# (b). Python的内存管理
# http://leyafo.logdown.com/posts/159345-python-memory-management
# 这个时候直接使用str来存储效率低下,但好消息是python原生提供了bytearray这样的数据结构。利用她mutable可变的特性做成Buffer,进一步组成Bufferpool内存缓存池,非常适配频繁申请释放较大内存的场景。
import os
import sys
import logging
_TOP = os.path.dirname(os.path.abspath(__file__)) + '/../'
sys.path.insert(0, _TOP)
from cup.services import buffers
from cup import unittest
class CTestServiceBuffer(unittest.CUTCase):
"""
service buffer
"""
def __init__(self):
super(self.__class__, self).__init__()
self._buffpool = buffers.BufferPool(
buffers.MEDIUM_BLOCK_SIZE,
102400,
)
def test_run(self):
"""test_run"""
ret, buff = self._buffpool.allocate(102401) # 回返的是Buffer数据结构
unittest.assert_eq(ret, False)
ret, buff = self._buffpool.allocate(10)
unittest.assert_eq(ret, True)
# pylint: disable=W0212
unittest.assert_eq(self._buffpool._used_num, 10)
unittest.assert_eq(self._buffpool._free_num, 102390)
ret = buff.set('a' * 10 * buffers.MEDIUM_BLOCK_SIZE) # Buffer数据结构支持set语义
unittest.assert_eq(ret[0], True)
ret = buff.set('a' * (10 * buffers.MEDIUM_BLOCK_SIZE + 1))
unittest.assert_ne(ret[0], True)
self._buffpool.deallocate(buff) # 归还buffer到线程池
# pylint: disable=W0212
unittest.assert_eq(self._buffpool._used_num, 0)
unittest.assert_eq(self._buffpool._free_num, 102400)
unittest.assert_eq(len(self._buffpool._free_list), 102400)
unittest.assert_eq(len(self._buffpool._used_dict), 0)
import os
import sys
import logging
_TOP = os.path.dirname(os.path.abspath(__file__)) + '/../'
sys.path.insert(0, _TOP)
from cup.services import buffers
from cup import unittest
class CTestServiceBuffer(unittest.CUTCase):
"""
service buffer
"""
def __init__(self):
super(self.__class__, self).__init__()
self._buffpool = buffers.BufferPool(
buffers.MEDIUM_BLOCK_SIZE,
102400,
)
def test_run(self):
"""test_run"""
ret, buff = self._buffpool.allocate(102401) # 回返的是Buffer数据结构
unittest.assert_eq(ret, False)
ret, buff = self._buffpool.allocate(10)
unittest.assert_eq(ret, True)
# pylint: disable=W0212
unittest.assert_eq(self._buffpool._used_num, 10)
unittest.assert_eq(self._buffpool._free_num, 102390)
ret = buff.set('a' * 10 * buffers.MEDIUM_BLOCK_SIZE) # Buffer数据结构支持set语义
unittest.assert_eq(ret[0], True)
ret = buff.set('a' * (10 * buffers.MEDIUM_BLOCK_SIZE + 1))
unittest.assert_ne(ret[0], True)
self._buffpool.deallocate(buff) # 归还buffer到线程池
# pylint: disable=W0212
unittest.assert_eq(self._buffpool._used_num, 0)
unittest.assert_eq(self._buffpool._free_num, 102400)
unittest.assert_eq(len(self._buffpool._free_list), 102400)
unittest.assert_eq(len(self._buffpool._used_dict), 0)
# 2. Exec执行器
# 3. 延迟执行器
# 谁没有延迟个XX秒,或者周期性XX秒后执行某件事情的需求呢?
from cup import log
from cup.services import executor
class TestMyCase(unittest.CUTCase):
"""
test class for cup
"""
def __init__(self):
super(self.__class__, self).__init__(
'./test.log', log.DEBUG
)
log.info('Start to run ' + str(__file__))
self._executor = executor.ExecutionService(
)
def setup(self):
"""
setup
"""
self._executor.run()
self._info = time.time()
def _change_data(self, data=None):
self._info = time.time() + 100
def test_run(self):
"""
@author: maguannan
"""
self._executor.delay_exec(5, self._change_data, 1) # 延迟5秒执行某个函数, 同时支持函数参数传递
time.sleep(2)
assert time.time() > self._info
time.sleep(5)
assert time.time() < self._info
def teardown(self):
"""
teardown
"""
cup.log.info('End running ' + str(__file__))
self._executor.stop()
if __name__ == '__main__':
cup.unittest.CCaseExecutor().runcase(TestMyCase())
from cup import log
from cup.services import executor
class TestMyCase(unittest.CUTCase):
"""
test class for cup
"""
def __init__(self):
super(self.__class__, self).__init__(
'./test.log', log.DEBUG
)
log.info('Start to run ' + str(__file__))
self._executor = executor.ExecutionService(
)
def setup(self):
"""
setup
"""
self._executor.run()
self._info = time.time()
def _change_data(self, data=None):
self._info = time.time() + 100
def test_run(self):
"""
@author: maguannan
"""
self._executor.delay_exec(5, self._change_data, 1) # 延迟5秒执行某个函数, 同时支持函数参数传递
time.sleep(2)
assert time.time() > self._info
time.sleep(5)
assert time.time() < self._info
def teardown(self):
"""
teardown
"""
cup.log.info('End running ' + str(__file__))
self._executor.stop()
if __name__ == '__main__':
cup.unittest.CCaseExecutor().runcase(TestMyCase())
# 3. QueueExec排队方式执行
# 支持排队执行某些带参数传递的函数,执行的并发度取决于执行器进行初始化的时候的线程数设置.排队执行器支持高优任务插队,urgency=0时自动插入到队首。 默认urgency=1
# 只要queue给执行器后,就喝个咖啡去吧。
def queue_exec(self, function, data, urgency=1):
"""
execute function in a queue. Functions will be queued in line to
be scheduled.
urgency := [0|1|2]. 0 is most urgent. 1 is normal. 2 is lest urgent
"""
task_data = (urgency, (function, data))
self.__exec_queue.put(task_data)
cup.err cup基础库相关的异常module.
cup.const 静态不可变变量支持
cup.oper 命令操作类相关module. 举例,cup.oper.rmrf、进程存在性检查、pid获取等等.
cup.timeplus 时间相关module. 简化time操作函数。
- run # python setup.py install
- CUP use pexpect and httplib.
* Pexpect http://pexpect.sourceforge.net/ (under MIT license)
* Httplib2 http://code.google.com/p/httplib2/ (under MIT license)