啥是CodeQL?

Code Query Language

CodeQL是Github Security Lab推出的一个基于语义的代码分析引擎,使用CodeQL可以让我们像对待数据一样去查询代码。通过编写查询条件来查找漏洞的所有变体以完全消除这个漏洞,并与他人分享漏洞查询语句

可以称之为基于模板的自动化变异分析

工作流程

  1. 将代码创建成数据库
  2. 编写QL查询从数据库中查询代码
  3. 解释查询结果

工作原理

数据库创建

使用语言相关的extractor从代码中提取抽象语法树(ast)、名称绑定的语义和类型信息,把源代码转化成单关系表示(single relational representation),以CodeQL数据库存储

执行查询

使用CodeQL专门设计的面向对象语言QL来查询此前创建的数据库

查询结果

将查询结果对应到源代码的上下文中去,即通过查询结果的解释找到源码中我们所关心的潜在漏洞

语义分析大概就是在ast中通过source正向找sink或通过sink回溯source,把审计变成路径查找

代码数据库

CodeQL 数据库包含从代码库中提取的可查询数据, 包含代码的完整、分层表示,包括抽象语法树、数据流图和控制流图的表示

每种语言都有自己独特的数据库模式,用于定义创建数据库的关系。该图为提取过程中的初始词汇分析与使用 CodeQL 的实际复杂分析提供了界面

对于每种语言,CodeQL 库定义类,以在数据库表上提供一层抽象

使用CodeQL

securitylab.github.com/tools/codeql

github security lab提供了一些教学项目以推广CodeQL,还举办有基于CodeQL的夺旗赛 https://securitylab.github.com/ctf/

更提供了 LGTM ,开箱即用的在线CodeQL环境

这里基于Visual Studio Code搭建离线的CodeQL环境

相关文档

安装CodeQL

  1. 安装Visual Studio Code
  2. 安装Visual Studio Code的CodeQL插件
  3. 按照 CodeQL 工作空间 完成剩下设置

代码数据库的获得

codeql database create –language=cpp –command= ```

针对某个chromium模块进行分析的办法

  1. 编译完整chromium

  2. 进入obj目录删除想要分析的模块的obj文件

  3. 执行以下命令

    gn gen out/ql && codeql database create <targetFolder> --language=cpp --Command=' ninja -C out/ql chrome'
    

    建立的输出文件夹结构

    - log\                # 输出的日志信息
    - db-cpp\             # 编译的数据库
    - src.zip             # 编译所对应的目标源码
    - codeql-database.yml # 数据库相关配置
    

    这部分可参考 CodeQL分析项目

初探CodeQL (Uboot)

GitHub Security Lab CTF 2: U-Boot Challenge | GitHub Security Lab

CodeQL的使用主要在于QL语句的编写,这里先从Github提供的入门项目Uboot Challenge开始熟悉

这个uboot项目基于对U-Boot NFS RCE漏洞的挖掘 简化

在这个项目中我们逐步完善QL语句,编写查询寻找一种RCE的模式

即 memcpy的size参数来自通过ntoh系列函数接受的远程输入

  • sink -> memcpy
  • source -> ntoh

只要在dataflow中找到两者参数间的可达路径,则说明远程可控memcpy的size参数,即可造成内存溢出最终导致RCE

Step 3 运行CodeQL

import cpp

from Function f
where f.getName() = "strlen"
select f, "a function named strlen"

从Function类型表中使用getName方法查找所有名称为strlen的函数定义

Step 4 剖析一个查询

一个查询的结构大致为

import /* ... 引用 CodeQL 库 ... */

from /* ... 变量声明 ... */
where /* ... 有关变量的逻辑表达式(约束条件) ... */
select /* ... 输出表达式 ... */

编写语句查找memcpy函数的定义

import cpp

from Function f
where f.getName() = "memcpy"
select f, "a function named memcpy"

Step 5 使用不同类型及对应谓词

编写语句查找ntohs、ntohl和ntohll宏的定义

引入了新的 Macro类,并使用正则匹配

import cpp 

from Macro m
where m.getName().regexpMatch("ntoh(s|l|ll)")
select m,m.getName()

Step 6 关联两个变量

编写语句查找所有的memcpy被调用

使用多个变量关联描述复杂代码关系

新的FunctionCall类

import cpp

from FunctionCall c, Function f
where c.getTarget() = f and f.getName() = "memcpy"
select c

事实上可以简化为单变量的约束

import cpp

from FunctionCall c
where c.getTarget().getName() = "memcpy"
select c

Step 7 继续关联两个变量

查找所有的ntoh*宏的调用

这里就直接继续用单变量方法了

import cpp

from MacroInvocation mi
where mi.getMacroName().regexpMatch("ntoh(s|l|ll)")
select mi

Step 8 修改查找结果的输出

查找这些宏调用所扩展到的顶级表达式

使用getExpr()谓词来返回我们要的表达式

import cpp

from MacroInvocation mi
where mi.getMacro().getName().regexpMatch("ntoh(s|l|ll)")
select mi.getExpr()

Step 9 编写自己的类

用exists引入临时变量,设置一个类的数据集合,声明时同名特征谓词会被调用以确定类的中的数据范围

由模板

import cpp

class NetworkByteSwap extends Expr {
  NetworkByteSwap () {
    // TODO: replace <class> and <var>
    exists(<class> <var> |
      // TODO: <condition>
    )
  } 
}

from NetworkByteSwap n
select n, "Network byte swap" 

得到

import cpp

class NetworkByteSwap extends Expr {
  NetworkByteSwap() {
    exists(MacroInvocation mi |
      mi.getMacroName().regexpMatch("ntoh(s|l|ll)") and
      this = mi.getExpr()
    )
  }
}

from NetworkByteSwap n
select n

Step 10 数据流分析

实现一个典型的污点分析

拓展TaintTracking类,覆盖添加isSource和isSink来约束我们需要查找的数据路径,从两个dataflow间搜索这样的路径

靠这个方法,可以从提供的uboot版本数据库中搜索出11个结果,即11个CVE !

import cpp
import semmle.code.cpp.dataflow.TaintTracking
import DataFlow::PathGraph


class NetworkByteSwap extends Expr {
  NetworkByteSwap() {
    exists(MacroInvocation mi |
      mi.getMacroName().regexpMatch("ntoh(s|l|ll)") and
      this = mi.getExpr()
    )
  }
}

class Config extends TaintTracking::Configuration {
  Config() { this = "Config: this name doesn't matter" }

  override predicate isSource(DataFlow::Node source) { source.asExpr() instanceof NetworkByteSwap }
  
  override predicate isSink(DataFlow::Node sink) {
    exists(FunctionCall c | c.getTarget().getName() = "memcpy" and sink.asExpr() = c.getArgument(2))
  }
}

from Config cfg, DataFlow::PathNode source, DataFlow::PathNode sink
where cfg.hasFlowPath(source, sink)
select sink, source, sink, "Network byte swap flows to memcpy"

More Detail

https://securitylab.github.com/research/uboot-rce-nfs-vulnerability/

完整的查询

import cpp

import semmle.code.cpp.dataflow.TaintTracking
import semmle.code.cpp.rangeanalysis.SimpleRangeAnalysis

class NetworkByteOrderTranslation extends Expr {
  NetworkByteOrderTranslation() {
    // On Windows, there are ntoh* functions.
    this.(Call).getTarget().getName().regexpMatch("ntoh(l|ll|s)")
    or
    // On Linux, and in some code bases, these are defined as macros.
    this = any(MacroInvocation mi |
        mi.getOutermostMacroAccess().getMacroName().regexpMatch("(?i)(^|.*_)ntoh(l|ll|s)")
      ).getExpr()
  }
}

class NetworkToMemFuncLength extends TaintTracking::Configuration {
  NetworkToMemFuncLength() { this = "NetworkToMemFuncLength" }

  override predicate isSource(DataFlow::Node source) {
       source.asExpr() instanceof NetworkByteOrderTranslation
  }
  
  override predicate isSink(DataFlow::Node sink) {
    exists (FunctionCall fc |
        fc.getTarget().getName().regexpMatch("memcpy|memmove") and
        fc.getArgument(2) = sink.asExpr() )
  } 
 
}

from Expr ntoh, Expr sizeArg, NetworkToMemFuncLength config
where config.hasFlow(DataFlow::exprNode(ntoh), DataFlow::exprNode(sizeArg))
select ntoh.getLocation(), sizeArg

除了这种模式,作者继续分析了其他变体

再探CodeQL(Segv)

GitHub Security Lab CTF 1: SEGV hunt | GitHub Security Lab

这个项目的目标是找到 GNU C Library (glibc) 中所有对alloca的不安全调用

alloca不会检查栈空间是否足够,如果申请的空间大于栈大小,则会返回一个非法指针导致程序SIGSEGV崩溃

使用生成好的glibc代码数据库

Step 0 查找alloca的本体

import cpp 

from Macro m
where m.getName().regexpMatch("alloca")
select m,m.getFile().toString()

得到"__builtin_alloca"

Step 1 查找所有对alloca的调用并过滤掉参数较小的调用

import cpp
import semmle.code.cpp.rangeanalysis.SimpleRangeAnalysis

from FunctionCall fc
where fc.getTarget().hasQualifiedName("__builtin_alloca")
    and (   upperBound(fc.getArgument(0).getFullyConverted()) >= 65536
        or  lowerBound(fc.getArgument(0).getFullyConverted()) < 0
        )
select fc,fc.getFile().toString()+":"+fc.getLocation().getStartLine()

Step 2 过滤掉由__libc_use_alloca保证安全的alloca调用

2.0 找出__libc_use_alloca调用
import cpp
import semmle.code.cpp.rangeanalysis.SimpleRangeAnalysis

from FunctionCall fc
where fc.getTarget().hasQualifiedName("__libc_use_alloca")
select fc,fc.getFile().toString()+":"+fc.getLocation().getStartLine()
2.1 找到调用__libc_use_alloca的GuardCondition
import cpp
import semmle.code.cpp.controlflow.Guards

from FunctionCall fc, GuardCondition gc, FunctionCall fc2
where fc.getTarget().hasQualifiedName("__builtin_alloca")
    and fc2.getTarget().hasQualifiedName("__libc_use_alloca")
    and gc.controls(fc.getBasicBlock(), _)
    and gc.getAChild()=fc2
select gc,gc.getFile().toString()+":"+gc.getLocation().getStartLine()

p.s. 这个查询效果出乎意料地渣

2.2 使用local dataflow查询__libc_use_alloca结果被赋值的情况
import cpp
import semmle.code.cpp.controlflow.Guards
import semmle.code.cpp.dataflow.DataFlow

from FunctionCall fc, GuardCondition gc, FunctionCall fc2, DataFlow::Node source, DataFlow::Node sink
where fc.getTarget().hasQualifiedName("__builtin_alloca")
    and fc2.getTarget().hasQualifiedName("__libc_use_alloca")
    and gc.controls(fc.getBasicBlock(), _)
    and DataFlow::localFlow(source,sink)
    and source.asExpr() = fc2
    and sink.asExpr() = gc
select gc,gc.getFile().toString()+":"+gc.getLocation().getStartLine()
2.3 过渡闭包找到__libc_use_alloca被的结果被其他宏封装的情况

Transitive Closure

import cpp
import semmle.code.cpp.controlflow.Guards
import semmle.code.cpp.dataflow.DataFlow

from FunctionCall fc, GuardCondition gc, FunctionCall fc2, DataFlow::Node source, DataFlow::Node sink
where fc.getTarget().hasQualifiedName("__builtin_alloca")
    and fc2.getTarget().hasQualifiedName("__libc_use_alloca")
    and gc.controls(fc.getBasicBlock(), _)
    and DataFlow::localFlow(source,sink)
    and source.asExpr() = fc2
    and sink.asExpr() = gc.getAChild*()
select gc,gc.getFile().toString()+":"+gc.getLocation().getStartLine()
2.4 __libc_use_alloca的否定情况

扩大上下游基础块的查找

ControlFlowNode

import cpp
import semmle.code.cpp.controlflow.Guards
import semmle.code.cpp.dataflow.DataFlow

from FunctionCall fc1, GuardCondition gc, FunctionCall fc2, 
     DataFlow::Node source, DataFlow::Node sink,
     BasicBlock bb1, BasicBlock bb2
where fc1.getTarget().hasQualifiedName("__builtin_alloca")
    and fc2.getTarget().hasQualifiedName("__libc_use_alloca")
    and bb1.contains(fc1)
    and bb2.contains(fc2)
    and gc.controls(bb1, _)
    and DataFlow::localFlow(source,sink)
    and source.asExpr() = bb2.getANode()
    and sink.asExpr() = gc.getAChild*()
select gc,gc.getFile().toString()+":"+gc.getLocation().getStartLine()
2.5 找到由__libc_use_alloca保证安全的alloca调用

把输出解释到alloca上

顺便做些封装以便接下来的使用

import cpp
import semmle.code.cpp.controlflow.Guards
import semmle.code.cpp.dataflow.DataFlow

string getPos(Expr f){
    result = f.getFile().toString()+":" + f.getLocation().getStartLine()
}

predicate isSafeAllocaCall(FunctionCall allocaCall){
    exists(FunctionCall fc, DataFlow::Node source, DataFlow::Node sink, GuardCondition gc, BasicBlock bb |
        fc.getTarget().hasQualifiedName("__libc_use_alloca") and
        bb.contains(fc) and
        gc.controls(allocaCall.getBasicBlock(), _) and
        source.asExpr() = bb.getANode() and
        sink.asExpr() = gc.getAChild*()  and
        DataFlow::localFlow(source,sink)
    )
}

from FunctionCall fc
where fc.getTarget().hasQualifiedName("__builtin_alloca") 
    and isSafeAllocaCall(fc)
select fc,getPos(fc)

Step 3 结合前面两步

import cpp
import semmle.code.cpp.controlflow.Guards
import semmle.code.cpp.dataflow.DataFlow
import semmle.code.cpp.rangeanalysis.SimpleRangeAnalysis

string getPos(Expr f){
    result = f.getFile().toString()+":" + f.getLocation().getStartLine()
}

predicate isSafeAllocaCall(FunctionCall allocaCall){
    exists(FunctionCall fc, DataFlow::Node source, DataFlow::Node sink, GuardCondition gc, BasicBlock bb |
        fc.getTarget().hasQualifiedName("__libc_use_alloca") and
        bb.contains(fc) and
        gc.controls(allocaCall.getBasicBlock(), _) and
        source.asExpr() = bb.getANode() and
        sink.asExpr() = gc.getAChild*()  and
        DataFlow::localFlow(source,sink)
    )
}

predicate isOoBAllocaCall(FunctionCall allocaCall){
    exists(Expr sizeArg|
        sizeArg=allocaCall.getArgument(0).getFullyConverted() and
        (upperBound(sizeArg) >= 65536 or lowerBound(sizeArg) < 0)
    )
}

from FunctionCall fc
where fc.getTarget().hasQualifiedName("__builtin_alloca") 
    and not isSafeAllocaCall(fc)
    and isOoBAllocaCall(fc)
select fc,getPos(fc)

Step 4 污点分析查找size可控的危险alloca

taint tracking

主要输入为文件,即追踪size来自fopen结果指针的alloca

sink是危险的alloca调用

source是fopen调用

寻找二者的可达路径

4.0 查找 fopen 的本体
import cpp 

from Macro m
where m.getName().regexpMatch("fopen")
select m,m.getFile().toString()

得到 "_IO_new_fopen"

4.1 完成污点分析

/**
  * @name 41_fopen_to_alloca_taint
  * @description Track taint from fopen to alloca.
  * @kind path-problem
  * @problem.severity warning
  */

import cpp
import semmle.code.cpp.rangeanalysis.SimpleRangeAnalysis
import semmle.code.cpp.dataflow.TaintTracking
import semmle.code.cpp.models.interfaces.DataFlow
import semmle.code.cpp.controlflow.Guards
import semmle.code.cpp.dataflow.DataFlow
import DataFlow::PathGraph

string getPos(Expr f){
  result = f.getFile().toString()+":" + f.getLocation().getStartLine()
}

predicate isSafeAllocaCall(FunctionCall allocaCall) {
  exists(FunctionCall fc, DataFlow::Node source, DataFlow::Node sink, GuardCondition guard, BasicBlock block |
    fc.getTarget().hasQualifiedName("__libc_use_alloca") and
    guard.controls(allocaCall.getBasicBlock(), _) and
    DataFlow::localFlow(source, sink) and
    block.contains(fc) and
    source.asExpr() = block.getANode() and
    sink.asExpr() = guard.getAChild*()
  )
}

predicate isOOBAllocaCall(FunctionCall allocaCall) {
  exists(Expr sizeArg | 
    sizeArg = allocaCall.getArgument(0).getFullyConverted() and
    (upperBound(sizeArg) >= 65536 or lowerBound(sizeArg) < 0)
  )
}

 // Track taint through `__strnlen`.
 class StrlenFunction extends DataFlowFunction {
   StrlenFunction() { this.getName().matches("%str%len%") }
 
   override predicate hasDataFlow(FunctionInput i, FunctionOutput o) {
     i.isInParameter(0) and o.isOutReturnValue()
   }
 }
 
 // Track taint through `__getdelim`.
 class GetDelimFunction extends DataFlowFunction {
   GetDelimFunction() { this.getName().matches("%get%delim%") }
 
   override predicate hasDataFlow(FunctionInput i, FunctionOutput o) {
     i.isInParameter(3) and o.isOutParameterPointer(0)
   }
 }
 
 class Config extends TaintTracking::Configuration {
   Config() { this = "fopen_to_alloca_taint" }
 
   override predicate isSource(DataFlow::Node source) {
      exists(FunctionCall fopenCall|
        fopenCall.getTarget().hasQualifiedName("_IO_new_fopen") and
        source.asExpr() = fopenCall
      )
   }
 
   override predicate isSink(DataFlow::Node sink) {
     exists(
       Expr sizeArg, FunctionCall allocaCall |
       allocaCall.getTarget().hasQualifiedName("__builtin_alloca") and
       not isSafeAllocaCall(allocaCall) and 
       isOOBAllocaCall(allocaCall) and 
       sizeArg=allocaCall.getArgument(0).getFullyConverted() and
       sink.asExpr() = sizeArg
     )
   }
 }
 
 from Config cfg, DataFlow::PathNode source, DataFlow::PathNode sink
 where cfg.hasFlowPath(source, sink)
 select sink, source, getPos(sink.getNode().asExpr()), "fopen flows to alloca"

Step 5 根据结果编写Poc

简单分析一个Crash Path

fopen flows to alloca	gconv_conf.c:323:25
Path
1	call to _IO_new_fopen 	gconv_conf.c:369:14
2	rp 	gconv_conf.c:418:14
3	rp 	gconv_conf.c:250:19
4	... + ... 	gconv_conf.c:323:25

源头是 read_conf_file 里从 filename 打开的 fp 文件句柄,

解析文件字节流时使用的 (char * )rp 在解析conf文件中的module块时会进入 add_module 并把rp作为实参

/* Read the next configuration file.  */
static void
read_conf_file (const char *filename, const char *directory, size_t dir_len,
		void **modules, size_t *nmodules)
{
  /* Note the file is opened with cancellation in the I/O functions
     disabled.  */
  FILE *fp = fopen (filename, "rce");
  ...
      if (rp - word == sizeof ("alias") - 1
	  && memcmp (word, "alias", sizeof ("alias") - 1) == 0)
	add_alias (rp, *modules);
      else if (rp - word == sizeof ("module") - 1
	       && memcmp (word, "module", sizeof ("module") - 1) == 0)
	add_module (rp, directory, dir_len, modules, nmodules, modcounter++);
      /* else */
	/* Otherwise ignore the line.  */
    }

  free (line);

  fclose (fp);
}

在add_module里rp形参被赋给了from

/* Add new module.  */
static void
add_module (char *rp, const char *directory, size_t dir_len, void **modules,
	    size_t *nmodules, int modcounter)
{
  /* We expect now
     1. `from' name
     2. `to' name
     3. filename of the module
     4. an optional cost value
  */
  struct gconv_alias fake_alias;
  struct gconv_module *new_module;
  char *from, *to, *module, *wp;
  int need_ext;
  int cost_hi;

  while (__isspace_l (*rp, _nl_C_locobj_ptr))
    ++rp;
  from = rp;
    ...
    /* See whether we have already an alias with this name defined.  */
  fake_alias.fromname = strndupa (from, to - from);
    ...
}

to-from作为strndupa的size实参

而strndupa是使用了alloca实现的strndup,存在_libc_use_alloca保证安全的alloca调用,因此在to-from也就是,即conf 中 module name 极端长的情况下会程序会crash (SIGSEV)

其实基本没利用价值 233

CodeQL编写

从以上两个CodeQL项目里基本学习了使用CodeQL编写的主要方法

需要经常翻阅的文档

QL language reference — CodeQL (github.com)

QL CPP API

更简单的练习题

https://help.semmle.com/QL/ql-training/cpp/snprintf.html

更多学习案例

https://github.com/github/securitylab/tree/main/CodeQL_Queries/cpp

大量现成的CodeQL查询

https://lgtm.com/search?q=language%3Acpp&t=rules

总结

CodeQL 集成了 控制流分析、数据流分析、污染跟踪、范围分析 从而可以使用它进行高效的变异分析

在完全理解一个新的攻击向量后即可通过CodeQL大规模扫描这种漏洞在项目内甚至于项目外的存在性,一网打尽

在一些用户量大,研究资源集中的安全焦点项目,被大量fuzz后总会产出一些复杂的漏洞模式,有些就可以通过CodeQL快速应用到其他的项目上

可以说变异分析是完全基于经验的静态分析方法,而Fuzzing的随机化过程使其是不那么依靠经验的动态分析方法,二者是存在互补的,LGTM平台可以将CodeQL查询和告警集成到开发流程中,而OSSfuzz和OneFuzz等云计算Fuzzing平台也将Fuzzing技术集成到开发流程中去

参考

https://kiprey.github.io/2020/12/CodeQL-setup/

https://www.4hou.com/posts/o6ok

https://null2root.github.io/blog/2021/02/12/GitHub-Security-Lab-CTF-1_SEGV-hunt-writeup.html