11.4 Gym-WAF架构

Gym-WAF基于OpenAI Gym和Keras-rl开发,主要由DQNAgent,WafEnv_v0,Waf_Check,Xss_Manipulator和Features组成。如图11-2所示,Features将XSS样本转换成向量,Waf_Check基于规则用于XSS检测,XSS的特征向量作为状态传递。DQNAgent基于当前状态和一定的策略,选择免杀动作。WafEnv_v0根据免杀动作,通过Xss_Manipulator针对XSS样本执行免杀操作,然后使用Features重新计算特征,再使用Waf_Check判断,如果不是XSS,反馈10并结束本轮学习;如果是XSS,反馈0以及新状态给DQNAgent,DQNAgent继续选择下一步免杀操作,如此循环。下面我们将介绍每个组件的具体原理和实现。

图11-2 Gym-WAF架构

11.4.1 Features类

Features类负责将字符串形式的XSS攻击样本提取特征,转换成机器学习模型可以使用的向量。最简单的一种实现就是把字符串转换成字节直方图,即Byte Histogram。把字符串转换成字节数组,然后统计每个字符出现的次数,这样可以有效地区分不同字符的组成情况。同时为了避免出现次数最多的字符对模型的不利影响,我们增加一个维度代表字符串长度,同时使用该长度对所有字节出现的次数取平均值,代码如下:


def byte_histogram(self,str):
    bytes=[ord(ch) for ch in list(str)]
    h = np.bincount(bytes, minlength=256)
    return np.concatenate([
        [h.sum()],  
        h.astype(self.dtype).flatten() / h.sum(),  
    ])

通过字节直方图和字符串长度,我们得到了一个257维的特征向量:


def extract(self,str):
    featurevectors = [
        [self.byte_histogram(str)]
    ]
    return np.concatenate(featurevectors)

11.4.2 Xss_Manipulator类

Xss_Manipulator类实现了针对XSS样本的免杀操作,定义对应的转换表ACTION_TABLE,代码如下:


ACTION_TABLE = {
'charTo16': 'charTo16',
'charTo10': 'charTo10',
'charTo10Zero': 'charTo10Zero',
'addComment': 'addComment',
'addTab': 'addTab',
'addZero': 'addZero',
'addEnter': 'addEnter',
}

其中,重要的参数介绍如下。

1.charTo16

charTo16函数实现了随机将标签内容中的字符转换成对应的十六进制编码。charTo16随机选择字符,将该字符转换成对应的十六进制,并且格式类似a,然后进行替换,随机替换1~3次,代码如下:


def charTo16(self,str,seed=None):
    matchObjs = re.findall(r'[a-qA-Q]', str, re.M | re.I)
    if matchObjs:
        modify_char=random.choice(matchObjs)
        #字符转ascii值ord(modify_char
        modify_char_16="&#{};".format(hex(ord(modify_char)))
        #替换
        str=re.sub(modify_char, modify_char_16, str,count=random.randint(1,3))
    return str

2.charTo10

charTo10函数实现了随机将标签内容中的字符转换成对应的十进制编码。charTo10随机选择字符,将该字符转换成对应的十进制,并且格式类似a,然后进行替换,代码如下:


def charTo10(self,str,seed=None):
    matchObjs = re.findall(r'[a-qA-Q]', str, re.M | re.I)
    if matchObjs:
        modify_char=random.choice(matchObjs)
        #字符转ascii值ord(modify_char
        #modify_char_10=ord(modify_char)
        modify_char_10="&#{};".format(ord(modify_char))
        #替换
        str=re.sub(modify_char, modify_char_10, str)
    return str

3.charTo10Zero

charTo10Zero与charTo10功能类似,唯一不同的是十进制的数字前面增加了几个0字符,代码如下:


def charTo10Zero(self,str,seed=None):
    matchObjs = re.findall(r'[a-qA-Q]', str, re.M | re.I)
    if matchObjs:
        modify_char=random.choice(matchObjs)
        #字符转ascii值ord(modify_char
        modify_char_10="&#000000{};".format(ord(modify_char))
        #替换
        str=re.sub(modify_char, modify_char_10, str)
    return str

4.addComment

addComment函数实现了随机向标签内容中增加注释的功能。addComment随机选择字符,在该字符后面增加注释,注释的内容可以随机,也可以设置成固定内容,代码如下:


def addComment(self,str,seed=None):
    matchObjs = re.findall(r'[a-qA-Q]', str, re.M | re.I)
    if matchObjs:
        #选择替换的字符
        modify_char=random.choice(matchObjs)
        #生成替换的内容
        modify_char_comment = "{}/*8888*/".format(modify_char)
        #替换
        str=re.sub(modify_char, modify_char_comment, str)
    return str

5.addTab

addTab函数实现了随机向标签内容中增加TAB的功能。addTab随机选择字符,将该字符前面增加Tab,代码如下:


def addTab(self,str,seed=None):
    matchObjs = re.findall(r'[a-qA-Q]', str, re.M | re.I)
    if matchObjs:
        #选择替换的字符
        modify_char=random.choice(matchObjs)
        #生成替换的内容
        modify_char_tab="   {}".format(modify_char)
        #替换
        str=re.sub(modify_char, modify_char_tab, str)
    return str

6.addZero

addZero函数实现了随机向标签内容中增加\0的功能。addZero随机选择字符,将该字符前面增加\0,代码如下:


def addZero(self,str,seed=None):
    matchObjs = re.findall(r'[a-qA-Q]', str, re.M | re.I)
    if matchObjs:
        #选择替换的字符
        modify_char=random.choice(matchObjs)
        #生成替换的内容
        modify_char_zero="\\00{}".format(modify_char)
        #替换
        str=re.sub(modify_char, modify_char_zero, str)
    return str

7.addEnter

addEnter函数实现了随机向标签内容中增加回车的功能。addEnter随机选择字符,将该字符前面增加回车,代码如下:


def addEnter(self,str,seed=None):
    matchObjs = re.findall(r'[a-qA-Q]', str, re.M | re.I)
    if matchObjs:
        #选择替换的字符
        modify_char=random.choice(matchObjs)
        #生成替换的内容
        modify_char_enter="\\r\\n{}".format(modify_char)
        #替换
        str=re.sub(modify_char, modify_char_enter, str)
    return str

8.modify

modify实现了使用函数名称的字符串访问函数的功能,如图11-3所示,modify需要和ACTION_TABLE和ACTION_LOOKUP配合使用,代码如下:


def modify(self,str, _action, seed=6):
    print "Do action :%s" % _action
    action_func=Xss_Manipulator().__getattribute__(_action)
    return action_func(str,seed)

图11-3 通过动作序号执行函数的流程图

11.4.3 DQNAgent类

DQNAgent具体实现了强化学习算法,关于DQN的详细介绍请参考本书第6章的内容,本章主要介绍在Gym-WAF中如何使用DQNAgent。

首先定义创建深度学习网络的函数,input_shape代表输入的特征向量的维度,layers代表深度学习网络的各层层数,nb_actions代表动作空间的大小,由于在本例中动作空间是有限的离散值,所以nb_actions事实上也就是动作的个数,同时也是深度学习网络输出层节点数。这里深度学习网络使用的是多层感知机(MLP),所以直接指定层数即可,本例中建议使用两层隐藏层,节点数分别为5和2,代码如下:


def generate_dense_model(input_shape, layers, nb_actions):
    model = Sequential()
    model.add(Flatten(input_shape=input_shape))
    model.add(Dropout(0.1))  
    for layer in layers:
        model.add(Dense(layer))
        model.add(BatchNormalization())
        model.add(ELU(alpha=1.0))
    model.add(Dense(nb_actions))
    model.add(Activation('linear'))
    return model

然后我们初始化Gym环境,获取环境env的动作空间大小:


ENV_NAME = 'Waf-v0' 
env = gym.make(ENV_NAME)
nb_actions = env.action_space.n
window_length = 1 

创建DQNAgent的深度学习网络:


model = generate_dense_model((window_length,) + env.observation_space.shape, layers, nb_actions)

创建策略对象policy,这里使用的是玻尔兹曼算法:


policy = BoltzmannQPolicy()

创建记忆体,大小为256:


memory = SequentialMemory(limit=256, ignore_episode_boundaries=False, window_length=window_length)

创建DQNAgent对象agent,指定使用的深度学习网络、动作空间大小、记忆体,使用的策略和批处理大小等参数:


agent = DQNAgent(model=model, nb_actions=nb_actions, memory=memory, nb_steps_warmup=16,enable_double_dqn=True,enable_dueling_network=True, dueling_type='avg',arget_model_update=1e-2, policy=policy, batch_size=16)

编译agent中的深度学习网络并开始学习,学习的总步数为rounds,其中非常重要的一个参数是nb_max_episode_steps,设置它主要是为了保证在学习过程中有一种退出机制,如果超过阈值可以自动退出,避免在一轮学习中因为异常情况一直学习,影响其他轮学习,代码如下:


agent.compile(RMSprop(lr=1e-3), metrics=['mae'])
agent.fit(env, nb_steps=rounds, visualize=False, verbose=2,nb_max_episode_steps=nb_max_episode_steps_train)

11.4.4 WafEnv_v0类

WafEnv_v0类基于OpenAI Gym框架,实现了强化学习中环境的主要功能。WafEnv_v0在初始化阶段加载了XSS样本文件,并且随机划分成了训练样本和测试样本,其测试样本占40%,代码如下:


samples_file="xss-samples-all.txt"
samples=[]
with open(samples_file) as f:
    for line in f:
        line = line.strip('\n')
        print "Add xss sample:" + line
        samples.append(line)
# 划分训练和测试集合
samples_train, samples_test = train_test_split(samples, test_size=0.4)

定义了动作转换表ACTION_LOOKUP:


ACTION_LOOKUP = {i: act for i, act in
enumerate(Xss_Manipulator.ACTION_TABLE.keys())}

初始化了动作空间、当前样本、获取特征的对象features_extra、用于检测XSS的waf_checker以及用于修改样本的xss_manipulatorer,代码如下:


self.action_space = spaces.Discrete(len(ACTION_LOOKUP))
#当前处理的样本
self.current_sample=""
self.features_extra=Features()
self.waf_checker=Waf_Check()
#根据动作修改当前样本免杀
self.xss_manipulatorer= Xss_Manipulator()

涉及的重要的函数介绍如下。

1.Step

Step函数根据输入的动作序号,针对当前的样本进行修改,然后再检测是否是XSS,如果不是XSS,说明免杀成功,回馈10,标记此轮学习完成;反之反馈0,继续学习,代码如下:


def _step(self, action):
    r=0
    is_gameover=False
    _action=ACTION_LOOKUP[action]
   self.current_sample=
   self.xss_manipulatorer.modify(self.current_sample,_action)
       if not self.waf_checker.check_xss(self.current_sample):
        #给奖励
        r=10
        is_gameover=True
        print "Good!!!!!!!avoid waf:%s" % self.current_sample
    self.observation_space=self.features_extra.extract(self.current_sample)
    return self.observation_space, r,is_gameover,{}

2.Reset

Reset函数负责重置环境,从样本列表中随机选择一个作为当前样本,并转换成对应的特征向量,作为初始状态,代码如下:


def _reset(self):
    self.current_sample=random.choice(samples_train)
    print "reset current_sample=" + self.current_sample
    self.observation_space=self.features_extra.extract(self.current_sample)
    return self.observation_space

11.4.5 Waf_Check类

Waf_Check类实现了针对字符串的XSS检测功能。Waf_Check在初始化函数里面定义了检测规则,该规则是一个非常简单的实现,有兴趣的读者可以参考11.2节“常见XSS防御方式”自行丰富,代码如下:


self.regXSS=r'(prompt|alert|confirm|expression])' \
            r'|(javascript|script|eval)' \
            r'|(onload|onerror|onfocus|onclick|ontoggle|onmousemove|ondrag)' \
            r'|(String.fromCharCode)' \
            r'|(;base64,)' \
            r'|(onblur=write)' \
            r'|(xlink:href)' \
            r'|(color=)'

Waf_Check类通过check_xss函数针对字符串进行检测,如果满足规则就识别为XSS,整个匹配过程都是忽略大小写的,代码如下:


def check_xss(self,str):
    isxss=False
    #忽略大小写
    if re.search(self.regXSS,str,re.IGNORECASE):
        isxss=True
    return isxss