用PySpark和PyTorch实现跨境支付Hive数据仓库的反洗钱数据分析

news/2025/2/27 5:19:41

一、数据仓库表结构

假设我们有两个主要表: transactions (交易表)和 customers (客户表)。

transactions 表

CREATE TABLE transactions (
    transaction_id STRING,
    customer_id STRING,
    counterparty_id STRING,
    transaction_amount DECIMAL(10, 2),
    transaction_time TIMESTAMP,
    transaction_location STRING,
    transaction_country STRING
)
PARTITIONED BY (year INT, month INT)
CLUSTERED BY (transaction_amount) INTO 4 BUCKETS;

customers 表

CREATE TABLE customers (
    customer_id STRING,
    customer_name STRING,
    customer_address STRING,
    customer_country STRING,
    risk_level INT
)

二、分析流程的PySpark语句

  1. 数据收集与整合

从不同数据源读取数据,假设数据存储在JSON文件中。

python">from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Anti - Money Laundering").getOrCreate()

# 读取交易数据
transactions_df = spark.read.json("path/to/transactions.json")
# 读取客户数据
customers_df = spark.read.json("path/to/customers.json")
  1. 数据清洗
python">from pyspark.sql.functions import col, when, row_number
from pyspark.sql.window import Window

# 去除交易数据中的重复记录
transactions_df = transactions_df.dropDuplicates()

# 处理交易数据中的缺失值,用0填充交易金额
transactions_df = transactions_df.fillna(0, subset=['transaction_amount'])

# 处理异常值,假设交易金额不能为负数,将负数标记为异常
transactions_df = transactions_df.withColumn("transaction_amount", when(col("transaction_amount") < 0, -1).otherwise(col("transaction_amount")))

# 去除客户数据中的重复记录
customers_df = customers_df.dropDuplicates()

# 处理客户数据中的缺失值,假设客户国家缺失用'Unknown'填充
customers_df = customers_df.fillna('Unknown', subset=['customer_country'])
  1. 数据集成
python"># 将交易数据和客户数据根据customer_id进行关联
joined_df = transactions_df.join(customers_df, on='customer_id', how='inner')
  1. 数据分析
python">from pyspark.sql.functions import count, sum, avg, col

# 设定金额阈值
amount_threshold = 100000
# 设定频率阈值,假设每天交易超过10次为频繁
frequency_threshold = 10

# 按客户ID和日期统计交易次数和总金额
daily_transaction_stats = joined_df.groupBy("customer_id", col("transaction_time").cast("date").alias("transaction_date")) \
  .agg(count("transaction_id").alias("transaction_count"), sum("transaction_amount").alias("total_amount"))

# 标记可疑交易
suspicious_transactions = daily_transaction_stats.filter((col("total_amount") > amount_threshold) | (col("transaction_count") > frequency_threshold))

三、用深度学习分析报告数据的PyTorch代码

假设我们将可疑交易数据转换为特征矩阵,用于深度学习模型训练。这里以简单的二分类(可疑或不可疑)为例。

python">import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

# 假设suspicious_transactions_df是上述分析得到的可疑交易数据,转换为特征矩阵
# 这里简单假设特征为交易金额和交易次数
features = torch.tensor(suspicious_transactions_df.select("total_amount", "transaction_count").collect(), dtype=torch.float32)
labels = torch.tensor(suspicious_transactions_df.select("is_suspicious").collect(), dtype=torch.float32).view(-1)

# 创建数据集和数据加载器
dataset = TensorDataset(features, labels)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# 定义神经网络模型
class AntiMoneyLaunderingModel(nn.Module):
    def __init__(self):
        super(AntiMoneyLaunderingModel, self).__init__()
        self.fc1 = nn.Linear(2, 16)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(16, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.sigmoid(out)
        return out

model = AntiMoneyLaunderingModel()
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 训练模型
for epoch in range(100):
    for i, (batch_features, batch_labels) in enumerate(dataloader):
        optimizer.zero_grad()
        outputs = model(batch_features)
        loss = criterion(outputs.view(-1), batch_labels)
        loss.backward()
        optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch + 1}/100], Loss: {loss.item():.4f}')

http://www.niftyadmin.cn/n/5869487.html

相关文章

使用前端 html css 和js 开发一个AI智能平台官网模板-前端静态页面项目

最近 AI 人工智能这么火&#xff0c;那必须针对AI 做一个 AI方面的 官方静态网站练手。让自己的前端技术更上一层楼&#xff0c;哈哈。 随着人工智能技术的不断发展&#xff0c;越来越多的AI应用开始渗透到各行各业&#xff0c;为不同领域的用户提供智能化解决方案。本网站致力…

C语言中的内存函数使用与模拟实现

目录 一、内存函数的使用 1、memcpy()函数 2、memmove()函数 3、memcpy()函数 4、memset()函数&#xff1a; 二、内存函数的模拟实现 1、模拟实现memcpy()函数 2、模拟实现memmove()函数 一、内存函数的使用 1、memcpy()函数 memcpy()函数可以指定字节数&#xff0c;把…

av_find_input_format 和 AVInputFormat 的关系

1. av_find_input_format 和 AVInputFormat 的关系 av_find_input_format 是 FFmpeg 中的一个函数&#xff0c;用于根据输入格式的名称&#xff08;如 "mp4"、"wav"、"avfoundation" 等&#xff09;查找对应的输入格式结构体 AVInputFormat。 …

Python学习第十七天之PyTorch保姆级安装

PyTorch安装与部署 一、准备工作二、pytorch介绍三、CPU版本pytorch安装1. 创建虚拟环境2. 删除虚拟环境1. 通过环境名称删除2. 通过环境路径删除 3. 配置镜像源4. 安装pytorch1. 首先激活环境变量2. 进入pytorch官网&#xff0c;找到安装指令 5. 验证pytorch是否安装成功 四、…

PyCharm社区版如何运行Django工程?

PyCharm 社区版虽然不像专业版那样提供对 Django 的直接支持&#xff0c;但仍然可以通过一些手动配置来运行 Django 工程。以下是详细的步骤&#xff1a; 步骤 1&#xff1a;安装 Django 确保你的环境中已经安装了 Django。如果没有安装&#xff0c;可以通过以下命令安装&…

NLP09-朴素贝叶斯问句分类(3/3)

首先有个问句分类类&#xff1a; class QuestionClassify: 以下均为该类中的属性。 def __init__(self):self.train_x Noneself.train_y Noneself.tfidf_vec Noneself.train_vec Noneself.model Noneself.question_category_dict None__init__ 是 Python 中的一个特殊方…

C++ ⾼性能内存池

目录 项⽬介绍 小知识点补充 定位new 英语单词&#xff1a; 什么是内存池 1.池化技术 2.内存池 3.内存池主要解决的问题 3.1 效率问题 3.2 碎片化 3.2.1 外碎片 4.了解一下malloc 先设计⼀个定⻓的内存池 New的实现 Delete的实现 性能测试 脱离malloc直接在…

「软件设计模式」命令模式(Command)

揭秘命令模式&#xff1a;用C实现智能家居的"万能遥控器" 一、从餐厅点餐看命令模式精髓 想象你坐在餐厅点餐时&#xff0c;服务员记录你的订单交给后厨&#xff0c;这个看似简单的过程蕴含着软件设计的智慧。命令模式&#xff08;Command&#xff09;正是将这种&quo…