【AI达人特训营】基于飞桨实现系统认证风险预测-异常检测

时间:2025-08-09 09:56:03

该项目基于百度飞桨实现系统认证风险预测的异常检测。首先对用户认证行为数据进行处理,提取特征如时间差、不同维度的统计量等。构建含多层全连接层的分类模型,并使用KL损失函数进行训练和验证,评估模型性能。最终生成预测结果,用于判断用户认证行为是否有风险。

【AI达人特训营】系统认证风险预测-异常检测

- 本项目采用百度飞桨PaddlePaddle构建系统认证风险预测模型,优化特征工程与调整架构,提升线下F分。

一、赛题背景

随着国家、企业对安全和效率越来越重视,作为安全基础设施之一的统一身份管理(IAM,Identity and Access Management)也得到越来越多的关注。 在IAM领域中,其主要安全防护手段是身份鉴别,既我们常见的口令验证、扫码验证、指纹验证等。它们一般分为三类,既用户所知(如口令)、所有(如身份证)、特征(如人脸)。但这些身份鉴别方式都有其各自的优缺点,比如口令,强度高了记不住,强度低了容易丢,又比如人脸,摇头晃脑做活体检测体验不好,静默检测如果算法不够,又很容易被照片、视频、人脸模型绕过。也因此,在等保2.0中对于三级以上系统要求必须使用两种以上的身份鉴别方式进行身份验证,来提高身份鉴别的可信度,这也被成为双因素认证。

然而对于用户而言,在一定程度上提高了安全性的同时却极大降低了用户体验。因此,IAM厂商开始借鉴UEBA、用户画像等行为分析技术来寻找一种既能保证用户体验又能够提升身份认证强度的方法。目前在IAM的发展进程中,最有可能落地的方法是基于规则的行为分析技术。这种技术不仅具有很高的可理解性,并且容易与身份验证系统进行联动操作。

尽管基于规则也有局限性,但它主要依赖于经验来进行判断,这种方法虽然能有效防止错误的逮捕,但也可能导致错过真正的威胁。它没有从数据层面证明是否有未经授权或非法获取身份信息的行为,更不用说使用窃取的身份信息来提前进行风险预警和处理了。

更多内容请前往比赛官网查看:系统认证风险预测-异常检测

比赛任务

在这次竞赛中,参赛团队需要开发一个基于用户认证行为数据和风险异常标记结构的系统。为了实现这个目标,你们需要构建两个关键的模型:用户认证行为特征模型和风险异常评估模型。首先,使用监督学习技术,将用户认证行为作为输入特征数据来训练你的风险异常评估模型。这一步骤的关键在于准确捕捉用户在特定情境下的行为模式,并据此识别潜在的风险点。通过这种方法,你可以有效地预测和防范欺诈行为。然后,利用训练好的风险异常评估模型,对当前用户的认证行为进行实时风险分析。通过对历史认证行为数据的学习和总结,你的系统可以动态地调整其评估标准,确保它能够迅速而准确地判断出用户的实际风险水平。最后,请记住,这是一个不断优化和完善的过程。随着更多真实世界的数据输入,你的模型将变得更加精准和可靠。期待你们的创新解决方案,帮助保护网络安全,同时提高用户体验。

二、数据分析及处理

参赛数据源自竹云的风险分析产品日志库,主要包含认证与风险日志信息。经过数据脱敏和筛选处理后,供公众参考使用。其中的认证日志记录了用户在访问系统时的行为,如登录、单点登录及退出操作等。

该比赛使用的数据已上传至AI Studio:https://aistudio.baidu.com/aistudio/datasetdetail/112151 In [1]

!pip uninstall paddlepaddle -y登录后复制 In [2]

!pip install paddlepaddle登录后复制

数据概况

In [3]

import pandas as pdimport numpy as numpyimport json train = pd.read_csv('data/data112151/train_dataset.csv', sep='\t') test = pd.read_csv('data/data112151/test_dataset.csv', sep='\t') data = pd.concat([train, test])登录后复制 In [4]

index = 0data['ii'] = data['session_id'].apply(lambda x: int(x[-7:-5])) data['location_first_lvl'] = data['location'].astype(str).apply(lambda x: json.loads(x)['first_lvl']) data['location_sec_lvl'] = data['location'].astype(str).apply(lambda x: json.loads(x)['sec_lvl']) data['location_third_lvl'] = data['location'].astype(str).apply(lambda x: json.loads(x)['third_lvl']) data.drop(['client_type', 'browser_source'], axis=1, inplace=True) data['auth_type'].fillna('1', inplace=True)登录后复制 In [5]

from tqdm import tqdmfrom sklearn.preprocessing import LabelEncoderfor col in tqdm(['user_name', 'action', 'auth_type', 'ip', 'ip_location_type_keyword', 'ip_risk_level', 'location', 'device_model', 'os_type', 'os_version', 'browser_type', 'browser_version', 'bus_system_code', 'op_target', 'location_first_lvl', 'location_sec_lvl', 'location_third_lvl']): lbl = LabelEncoder() data[col] = lbl.fit_transform(data[col])登录后复制 In [6]

import numpy as np data['op_date'] = pd.to_datetime(data['op_date']) data['year'] = data['op_date'].dt.year data['month'] = data['op_date'].dt.month data['day'] = data['op_date'].dt.day data['hour'] = data['op_date'].dt.hour data['op_ts'] = data["op_date"].values.astype(np.int64) // 10 ** 9data = data.sort_values(by=['user_name', 'op_ts']).reset_index(drop=True) data['last_ts'] = data.groupby(['user_name'])['op_ts'].shift(1) data['ts_diff1'] = data['op_ts'] - data['last_ts']登录后复制 In [7]

for f in ['ip', 'location', 'device_model', 'os_version', 'browser_version']: data[f'user_{f}_nunique'] = data.groupby(['user_name'])[f].transform('nunique')for method in ['mean', 'max', 'min', 'std', 'sum', 'median']: for col in ['user_name', 'ip', 'location', 'device_model', 'os_version', 'browser_version']: data[f'ts_diff1_{method}_' + str(col)] = data.groupby(col)['ts_diff1'].transform(method)登录后复制 In [8]

group_list = ['user_name','ip', 'location', 'device_model', 'os_version', 'browser_version', 'op_target']登录后复制 In [9]

cat_cols = ['action', 'auth_type', 'browser_type', 'browser_version', 'bus_system_code', 'device_model', 'ip', 'ip_location_type_keyword', 'ip_risk_level', 'location', 'op_target', 'os_type', 'os_version', 'user_name' ]登录后复制 In [10]

data.drop(columns = 'session_id', inplace=True)登录后复制 In [11]

data.drop(columns='op_date',inplace=True)登录后复制 In [12]

features = [item for item in data.columns if item != 'risk_label'] traindata = data[~data['risk_label'].isnull()].reset_index(drop=True) traindata = traindata.fillna(0) testdata = data[data['risk_label'].isnull()].reset_index(drop=True)for item in features: testdata[item] = testdata[item].fillna(0)登录后复制 In [13]

data_X = traindata[features].values[:12000] data_Y = traindata['risk_label'].values[:12000].astype(int).reshape(-1, 1) data_X_test = traindata[features].values[12000:] data_Y_test = traindata['risk_label'].values[12000:].astype(int).reshape(-1, 1) testdata = testdata[features].values# 归一化from sklearn.preprocessing import MinMaxScaler mm = MinMaxScaler() data_X = mm.fit_transform(data_X) data_X_test = mm.fit_transform(data_X_test) testdata = mm.fit_transform(testdata)登录后复制 In [14]

testdata.shape登录后复制

三、模型组网

使用飞桨PaddlePaddle进行组网,在本基线系统中,只使用两层全连接层完成分类任务。 In [15]

import randomimport paddle seed = 1234# 设置随机种子 固定结果def set_seed(seed): np.random.seed(seed) random.seed(seed) paddle.seed(seed) set_seed(seed)登录后复制 In [16]

在动态图中,我们使用PaddlePaddle库创建了名为Classification的类。该类包含一个名为Tanh的激活函数,并定义了五个全连接层和相应的dropout层,用于模型训练。

四、配置参数及训练

记录日志

In [17]

# 定义绘制训练过程的损失值变化趋势的方法draw_train_processtrain_nums = [] train_costs = []def draw_train_process(iters,train_costs): title="training cost" plt.title(title, fontsize=24) plt.xlabel("iter", fontsize=14) plt.ylabel("cost", fontsize=14) plt.plot(iters, train_costs,color='red',label='training cost') plt.grid() plt.show()登录后复制

定义损失函数

损失函数使用【R-Drop:摘下SOTA的Dropout正则化策略】里的kl_loss: In [36]

import paddleimport paddle.nn.functional as Fclass kl_loss(paddle.nn.Layer): def __init__(self): super(kl_loss, self).__init__() def forward(self, p, q, label): ce_loss = 0.5 * (F.mse_loss(p, label=label)) + F.mse_loss(q, label=label) kl_loss = self.compute_kl_loss(p, q) # carefully choose hyper-parameters loss = ce_loss + 0.3 * kl_loss return loss def compute_kl_loss(self, p, q): p_loss = F.kl_div(F.log_softmax(p, axis=-1), F.softmax(q, axis=-1), reduction='none') q_loss = F.kl_div(F.log_softmax(q, axis=-1), F.softmax(p, axis=-1), reduction='none') # You can choose whether to use function "sum" and "mean" depending on your task p_loss = p_loss.sum() q_loss = q_loss.sum() loss = (p_loss + q_loss) / 2 return loss登录后复制

模型训练

In [37]

import paddle.nn.functional as F y_preds = [] labels_list = [] BATCH_SIZE = 8train_data = data_X train_data_y = data_Y test_data = data_X_test test_data_y = data_Y_test compute_kl_loss = kl_loss()def train(model): print('start training ... ') # 开启模型训练模式 model.train() EPOCH_NUM = 5 train_num = 0 scheduler = paddle.optimizer.lr.CosineAnnealingDecay(learning_rate=0.001, T_max=int(traindata.shape[0]/BATCH_SIZE*EPOCH_NUM), verbose=False) optimizer = paddle.optimizer.Adam(learning_rate=scheduler, parameters=model.parameters()) for epoch_id in range(EPOCH_NUM): # 在每轮迭代开始之前,将训练数据的顺序随机的打乱 np.random.shuffle(train_data) # 将训练数据进行拆分,每个batch包含50条数据 mini_batches = [np.append(train_data[k:k+BATCH_SIZE],train_data_y[k:k+BATCH_SIZE],axis=1)for k in range(0, len(train_data))] for batch_id, data in enumerate(mini_batches): features_np = np.array(data[:, :66], np.float32) labels_np = np.array(data[:, -1:], np.float32) features = paddle.to_tensor(features_np) labels = paddle.to_tensor(labels_np) # m = labels.flatten(0) # m = np.array(m) #前向计算 y_pred1 = model(features) # x= paddle.argmax(y_pred1,axis=-1) # x = np.array(x) # acc = acc+np.sum(m == x) # print(acc) y_pred2 = model(features) cost = compute_kl_loss(y_pred1, y_pred2, label=labels) # cost = F.mse_loss(y_pred, label=labels) train_cost = cost.numpy()[0] #反向传播 cost.backward() #最小化loss,更新参数 optimizer.step() # 清除梯度 optimizer.clear_grad() if batch_id % 500 == 0 and epoch_id % 1 == 0: print("Pass:%d,Cost:%0.5f"%(epoch_id, train_cost)) train_num = train_num + BATCH_SIZE train_nums.append(train_num) train_costs.append(train_cost) model = Classification() train(model)登录后复制 In [38]

def predict(model): print('start predicting') model.eval() outputs = [] mini_batches = [np.append(test_data[k:k+BATCH_SIZE],test_data_y[k:k+BATCH_SIZE],axis=1)for k in range(0,len(test_data),BATCH_SIZE)] for data in mini_batches: features_np = np.array(data[:,:66],np.float32) features = paddle.to_tensor(features_np) pred = model(features) out = paddle.argmax(pred,axis=1) outputs.extend(out.numpy()) return outputs outputs = predict(model)登录后复制 In [39]

test_data_y = test_data_y.reshape(-1,) outputs = np.array(outputs) np.sum(test_data_y==outputs)/test_data_y.shape[0]登录后复制 In [44]

from sklearn.metrics import f1_score x = f1_score(test_data_y,outputs) x登录后复制

绘制趋势曲线

In [40]

import matplotlibimport matplotlib.pyplot as pltimport warnings warnings.filterwarnings('ignore') %matplotlib inline draw_train_process(train_nums, train_costs)登录后复制

五、保存预测结果

模型预测: In [41]

predict_result = []for infer_feature in testdata: infer_feature = infer_feature.reshape(1,66) infer_feature = paddle.to_tensor(np.array(infer_feature, dtype='float32')) result = model(infer_feature) predict_result.append(result)登录后复制

将结果写入.CSV文件中: In [42]

import osimport pandas as pd id_list = [item for item in range(1, 10001)] label_list = [] csv_file = 'submission.csv'for item in range(len(id_list)): label = np.argmax(predict_result[item]) label_list.append(label) data = {'id':id_list, 'ret':label_list} df = pd.DataFrame(data) df.to_csv(csv_file, index=False, encoding='utf8')登录后复制

以上就是【AI达人特训营】基于飞桨实现系统认证风险预测-异常检测的详细内容,更多请关注其它相关文章!

相关下载

相关资讯

猜你喜欢

最新资讯

相关合集