求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
  
 
 
     
   
分享到
一种搭建分布式测试环境和批量性能测试的思路
 
发布于2013-1-18
 

背景

在搜索引擎的测试过程中,经常会遇到以下两个问题:

  • 需要搭建和更新分布式测试环境
  • 在性能测试时,我们需要测试不同集群规模和配置下的环境时,如何自动更新测试环境和批量进行性能测试

因此,我们需要设计一个脚本,这个脚本可以帮我来完成这些事。

在这里,我推荐使用Python,理由有:

  • 写起来比较快(测试时间本来就比较紧张),不可能用C或者Java了
  • 语法比较清晰,Shell、Perl这些维护起来太乱
  • 自带的库、第三方的库比较丰富
  • 另外,我个人比较喜欢Python的mako模版引擎和paramikossh2库。

其实不用paramiko也可以,只要把机器ssh打通就可以。但我个人不太喜欢这种方式,觉得耦合性太强(只能在Linux下运行了)

设计

批量性能测试的设计

我很喜欢采用YAML格式,YAML格式的一大好处就是可以很方便的定义List、Map等类型

 tasks: 
 # 第一个测试用例,我可能需要测试单线程的情况 
 - 
    id:1# ID的作用是你在脚本中可以拿id作为结果存放的目录 
    parallelNum:1# 并发数 
    seconds:1800# 压半个小时 
    targetHost:10.20.137.22 # 目标主机 
    targetPort:9999 
    queryFilePath:/home/admin/access-log/add-600w.query  # 请求放在这儿 
 # 第2个测试用例,我可能需要测试2线程的情况,这时我就只要再写一个,然后parallelNum: 2就可以了 
 - 
    id:1 
    parallelNum:2 
    seconds:1800 
    targetHost:10.20.137.22 

    targetPort:9999 
    queryFilePath:/home/admin/access-log/add-600w.query

在阿里的搜索平台这边,我们大多使用abench作为性能测试工具,它是一个命令行工具,只要命令+参数就可以了,比起JMeter要写JMeter脚本简单。因此,我再在配置文件中设计一下abench的命令格式

因为在运行命令中,有很多参数需要替换成上述测试用例设定的参数,因此需要采用模版引擎的方式。Python的模版引擎很多,我个人比较推荐mako。

  abenchPath:/opt/usr/bin/abench  # abench在哪儿? 
  abenchCommand:"${abenchPath} -p ${parallelNum} -s ${seconds} -k --http -o /dev/null ${targetHost} 
${targetPort} ${queryFilePath}" 

配置文件设计好了,下面我们来写我们的Python脚本了(这里仅仅给出一些主要代码,大致明白意思就可以了)

 import subprocess 
 from mako.template import Template 
 import yaml 
 
 # 运行一个测试任务 
 def runTask(config, task): 
    runAbench(config, task) 
 
 def runAbench(config, task): 
     # 得到完成的abench运行命令 
     command = Template(config["abenchCommand"]).render( 
         abenchPath=config["abenchPath"], 
         parallelNum=task["parallelNum"], 
         seconds=task["seconds"], 
         targetHost=task["targetHost"], 
         targetPort=task["targetPort"], 
         queryFilePath=task["queryFilePath"], 
         ) 
     pipe = subprocess.Popen(command, 
         stdin=subprocess.PIPE, 
         stdout=subprocess.PIPE, 
         stderr=subprocess.PIPE, 
         shell=True 
         ) 
     # 读取abench的运行结果,因为可能得保存下来吧 
     result = pipe.stdout.read() 
     # 下面可能是保存结果什么的 
 
 if __name__ == "__main__": 
    config = yaml.load(file(configFile)) 
    for task in config["tasks"]: 
       runTask(config, task)

自动更新测试环境

在我实际测试过程中,因为要更新的环境其实相当复杂,最多的时侯需要去10几台机器上做更新环境、停止/启动进程的操作。但我这里主要介绍思路,多一些机器和进程其实都一样。

接着刚才的配置文件,我们只是在每一个task中设计了加压任务,但在加压前需要更新哪些环境没有涉及,按照阿里巴巴的ISearch架构,我就启动一个一行两列的Searcher环境,2列Searcher上有一个Merger,然后再有一个clustermap来监控。

 abenchPath: /opt/usr/bin/abench  # abench在哪儿? 
 abenchCommand: "${abenchPath} -p ${parallelNum} -s ${seconds} -k --http -o /dev/null ${targetHost} ${targetPort}
 ${queryFilePath}" 
 # 关于Searcher的一些通用配置 
 searcher: 
     templateConfigFile: /home/admin/access-log/searcher_server.cfg  
     # 因为启动时的监听端口等信息需要从下面的运行任务中读取,因此这个也设计成一个模版文件 
     templateLogConfigFile: /home/admin/access-log/searcher_log.cfg 
     # 在Search机器上操作的命令 
     commands: 
         - "${searchRoot}/bin/is_searcher_server -c ${configFile} -l ${logConfigFile} -k stop > /dev/null 2>&1" 
         - "${searchRoot}/bin/is_searcher_server -c ${configFile} -l ${logConfigFile} -k start -d > /dev/null 2>&1" 
 # 关于Merger的一些通用配置,和Searcher差不多,就不写了 
  
 tasks: 
   # 第一个测试用例,我可能需要测试单线程的情况 
   - 
     id: 1 # ID的作用是你在脚本中可以拿id作为结果存放的目录 
     parallelNum: 1 # 并发数 
     seconds: 1800 # 压半个小时 
     targetHost: 10.20.137.22  # 目标主机 
     targetPort: 9999 
     queryFilePath: /home/admin/access-log/add-600w.query  # 请求放在这儿 
  
     # 两台Search机器,定义一个List 
     searchers: 
        - 
          host: 10.20.150.61 
          port: 6322 # 监听的端口 
          username: test 
		  # 因为需要通过ssh协议登录上去操作,因此需要用户名密码。如果你已经把机器ssh都打通了,那就不需要了 
          password: 12345 
          configFile: "${searchRoot}/scripts/conf/searcher_server.cfg" # 启动时运行的配置文件 
          logConfigFile: "${searchRoot}/scripts/conf/searcher_log.cfg" # 启动时运行的日志文件 
        - 
          host: 10.20.150.60 
          port: 6322 
          username: test 
          password: 12345 
          configFile: "${searchRoot}/scripts/conf/searcher_server.cfg" 
          logConfigFile: "${searchRoot}/scripts/conf/searcher_log.cfg" 
  
     # 我这边只有一台merger,如果merger也是有多台的话,也可以把这个设计成一个List 
     merger: 
        host: 10.20.137.22 
        port: 6088 
        username: test 
        password: 12345 
        configFile: "${searchRoot}/scripts/conf/merger_server.cfg" 

然后比如关于Searcher的配置文件,在上面也是一个模版文件阿,我们可以把这个文件设计成:

  se_conf_file=${searchRoot}/scripts/conf/se.conf 
  simon_conf_path=${searchRoot}/scripts/conf/simon_searcher.xml 
  sort_config=${searchRoot}/scripts/conf/searcher_sort.xml 
  cache_size=0 
  cache_min_doc=0 
  conn_queue_limit=500 
  [services] 
  tcp ${port} # 主要就是为了替换监听的端口,其实要做得通用一点的话,很多配置都可以搞成变量,但就是可能你自己的配置
文件变得很复杂。因此我们能不改的就尽量不改。 
   
  [clustermap] 
  local_config_path=${searchRoot}/scripts/conf/clustermap.xml 

上述就是关于searcher和merger多行多列的配置,下面我们完善一下我们刚才的Python脚本

 # 得的一个ssh登录后的client对象,用于调用远程机器上的命令 
 def getClient(host, port, username, password): 
     client = paramiko.SSHClient() 
     client.load_system_host_keys() 
     client.set_missing_host_key_policy(paramiko.WarningPolicy() 
     client.connect(hostname, port, username, password) 
     return client 
  
 # 得到一个sftp对象,因为需要scp渲染好的配置文件什么的,因此需要sftp对象,它的put方法其实就类似scp 
 def getSftp(host, port, username, password): 
     transport = paramiko.Transport((hostname, port)) 
     transport.connect(username=username, password=password) 
     sftp = paramiko.SFTPClient.from_transport(transport) 
     return sftp 
  
 # 更新和部署Searchers 
 def cleanSearchers(config, searchers): 
    for searcher in searchers: 
        # 得到渲染好的配置文件的内容 
        templateLine = Template(file(config["searcher"]["templateConfigFile"]).read()).render( 
            port=searcher["port"], 
            searchRoot=config["searchRoot"] 
            ) 
        # 将渲染好的配置文件写入一个临时文件 
        tmpConfigFile = tempfile.NamedTemporaryFile(delete=False) 
        tmpConfigFile.file.write(templateLine) 
        tmpConfigFile.file.close() 
        # 将这个临时文件scp拷远程机器上的哪儿 
        targetConfigFile = Template(searcher["configFile"]).render(searchRoot=config["searchRoot"]) 
        sftp = getSftp(searcher["host"], 22, searcher["username"], searcher["password"]) 
        sftp.put(tmpConfigFile.name, targetConfigFile) 
        sftp.close() 
        # 删除掉之前的临时文件 
        os.remove(tmpConfigFile.name) 
        # 运行启动searcher的命令 
        client = getClient(searcher["host"], 22, searcher["username"], searcher["password"]) 
        for command in config["searcher"]["commands"]: 
            command = Template(command).render( 
                searchRoot=config["searchRoot"], 
                configFile=targetConfigFile, 
                logConfigFile=targetLogConfigFile 
                ) 
            client.exec_command(cmd) 
        client.close()  

关于clustermap的配置

在阿里巴巴的ISearch架构中,searchers几行几列是由clustermap来配置的,我们这边也稍微简单话一点,不考虑merger有多台的情况,就设计searchers几行几列的情况。更新一下刚才在task中的配置,加上关于clustermap的配置

      clustermap: 
        host: 10.20.137.22 
        username: test 
        password: 12345 
        configFile: "${searchRoot}/scripts/conf/clustermap.xml" 
        # 一台merge 
        merger: 
          host: 10.20.137.22 
          port: 6088 
        # 关于searcher的配置,其实是一个二维数组。第一个纬度是列,第2个纬度是行。以下这个例子是1列2行 
        searchers: 
          - 
            servers:  # 同一列下的机器 
              - 
                host: 10.20.150.61 
                port: 6322 
          - 
            servers: 
              - 
                host: 10.20.150.60 
                port: 6322 

上述是1列2行的例子,如果要配成2行2列就只要在searchers部分配成:

       searchers: 
         - 
           servers:  # 同一列下的机器 
             - 
               host: 10.20.150.61 
               port: 6322 
             - 
               host: 10.20.150.59 
               port: 6322 
         - 
           servers: 
             - 
               host: 10.20.150.60 
               port: 6322 
             - 
               host: 10.20.150.62 
               port: 6322 

然后为了迎合clustermap配置的这种设计,在clustermap的模版配置文件也需要改一下:

 <?xml version="1.0"?> 
 <clustermap> 
     <!-- 关于Merger的配置,这里我暂时不考虑merger多台的情况 --> 
     <merger_list> 
             <merger_cluster name=m1 level=1> 
                 <merger ip=${merger["host"]} port=${merger["port"]} protocol=http/> 
             </merger_cluster> 
     </merger_list> 
 
 <!-- 下面是searcher的多行行列的配置,是一个二维数组 --> 
 <search_list> 
 <% 
 id = 1  # 这个值是纪录searcher列的名字 
 %> 
 <!-- 第一个纬度,同一列的 --> 
 % for searcher in searchers: 
 <search_cluster name=c${id} docsep=false level=1 partition=0> 
     <!-- 第二个纬度,同一行的 --> 
     % for server in searcher["servers"]: 
     <search ip=${server["host"]} port=${server["port"]} protocol=tcp type=mix /> 
     % endfor 
 </search_cluster> 
     <% 
     id += 1 
     %> 
 % endfor 
 </search_list> 
  
     <merger_cluster_list> 
             <merger_cluster name=m1> 
                 % for i in range(1, id): 
                 <search_cluster name=c${i} /> 
                 % endfor 
             </merger_cluster> 
     </merger_cluster_list> 
 </clustermap> 

这样比如1行2列渲染出来成了:

  <?xml version="1.0"?> 
  <clustermap> 
      <merger_list> 
              <merger_cluster name=m1 level=1> 
                  <merger ip=10.20.137.22 port=6088 protocol=http/> 
             </merger_cluster> 
      </merger_list> 
  
  <search_list> 
  <search_cluster name=c1 docsep=false level=1 partition=0> 
      <search ip=10.20.150.60 port=6322 protocol=tcp type=mix /> 
  </search_cluster> 
  <search_cluster name=c1 docsep=false level=1 partition=0> 
      <search ip=10.20.150.61 port=6322 protocol=tcp type=mix /> 
  </search_cluster> 
  </search_list> 
   
      <merger_cluster_list> 
              <merger_cluster name=m1> 
                  <search_cluster name=1 /> 
              </merger_cluster> 
      </merger_cluster_list> 
  </clustermap>  

总结

上述就是我在测试中,对分布式环境的自动更新和批量性能测试,这样大大减少了我们来回捣固机器、修改配置的时间。而且对测试结果的自动收集和解析也可以帮助我们来分析测试结果。我觉得这是一个不错的尝试,大家可以都可以试试看。




LoadRunner性能测试基础
软件测试结果分析和质量报告
面向对象软件测试技术研究
设计测试用例的四条原则
功能测试中故障模型的建立
性能测试综述
更多...   


性能测试方法与技术
测试过程与团队管理
LoadRunner进行性能测试
WEB应用的软件测试
手机软件测试
白盒测试方法与技术


某博彩行业 数据库自动化测试
IT服务商 Web安全测试
IT服务商 自动化测试框架
海航股份 单元测试、重构
测试需求分析与测试用例分析
互联网web测试方法与实践
基于Selenium的Web自动化测试
更多...