前段时间刷短视频上说,如果面试官问你:“有一亿条文本文件数据(如:csv)导入到数据库,你会用什么办法?”我的第一想法是直接写个程序,逐行读取,批次插入,也就几十行的代码搞定了。
今天视频照进现实了,一个小伙伴问我,他们要ip地址查询归属地,购买了离线数据包数据量上千万条,格式类似csv,要导入mongodb数据库(游戏项目,用这个数据库的比较多,主打一个灵活,能快速应对脑洞大开的策划时不时的搞活动加属性等)。
一、数据格式
| IP段起始 | IP段结束 | 归属地 | 网络 |
|---|---|---|---|
| 1.0.0.0 | 1.0.0.255 | 泛播 | Cloudflare 数据中心 |
| 1.0.1.0 | 1.0.3.255 | 中国福建省福州市 | 电信 |
| 1.0.4.0 | 1.0.7.255 | 澳大利亚维多利亚墨尔本 |
IP段起始,IP段结束,归属地,网络
1.0.0.0,1.0.0.255,泛播,Cloudflare 数据中心
1.0.1.0,1.0.3.255,中国福建省福州市,电信
1.0.4.0,1.0.7.255,澳大利亚维多利亚墨尔本,
...
二、设计思路
1. 数据分析
首先可以将ipv4地址转成uint32类型,每条的ip地址区间是不重叠的。MongoDB 本身不支持“区间索引”或“范围包含查询”的最优结构(不像 PostgreSQL 的 range 类型 + GiST 索引),但可以通过合理建模 + 索引优化实现高效查询。只创建开始IP的索引,使用$lt + 排序,逻辑层再判断 ip值小于结束地址,就能实现指数性能的时间查询。
findOne({start ≤ x}, sort: start:-1) + 应用层校验
2. 数据插入
数据量有点大,可以逐行读取,批量插入,每次插入 1000~10000 条,减少网络往返,等导入完数据后,再创建索引,这样会大幅度提升写入速度。
三、 Go 代码实现
package main
import (
"bufio"
"context"
"fmt"
"log"
"net"
"os"
"strings"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
const batchSize = 5000 // 每批插入数量(建议 1k~10k)
type IpInfo struct {
Start uint32 `bson:"start"` // 开始地址字节
End uint32 `bson:"end"` // 结束地址字节
Origin string `bson:"origin"` // 归属地
Isp string `bson:"isp"` // 运营商
}
func main() {
// 1. 连接 MongoDB
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI("mongodb://root:123456@localhost:27017"))
if err != nil {
log.Fatal(err)
}
defer client.Disconnect(context.TODO())
coll := client.Database("ip").Collection("local")
// 2. 清空旧数据(可选)
_, _ = coll.DeleteMany(context.TODO(), bson.D{})
// 3. 打开文件
file, err := os.Open("data.csv")
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
var batch []mongo.WriteModel
count := 0
startTime := time.Now()
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
parts := strings.Split(line, ",")
if len(parts) < 4 {
log.Printf("跳过无效行: %s", line)
continue
}
start := parseIP(parts[0])
end := parseIP(parts[1])
if start == 0 || end == 0 {
log.Printf("解析失败IP: %s", line)
continue
}
doc := &IpInfo{
Start: start,
End: end,
Origin: parts[2],
Isp: parts[3],
}
batch = append(batch, mongo.NewInsertOneModel().SetDocument(doc))
count++
// 达到批次大小,执行批量插入
if len(batch) >= batchSize {
_, err := coll.BulkWrite(context.TODO(), batch, options.BulkWrite().SetOrdered(false))
if err != nil {
log.Fatalf("批量插入失败: %v", err)
}
fmt.Printf("已导入 %d 行\n", count)
batch = batch[:0] // 重用切片,避免频繁分配
}
}
// 插入剩余数据
if len(batch) > 0 {
_, err := coll.BulkWrite(context.TODO(), batch, options.BulkWrite().SetOrdered(false))
if err != nil {
log.Fatalf("最后一批插入失败: %v", err)
}
}
elapsed := time.Since(startTime)
fmt.Printf("总共导入 %d 行,耗时: %v\n", count, elapsed)
// 4. 导入完成后创建索引
fmt.Println("正在创建索引...")
_, err = coll.Indexes().CreateOne(context.TODO(), mongo.IndexModel{
Keys: bson.D{{"start", 1}},
Options: options.Index(), // 默认非唯一,无需额外设置
})
if err != nil {
log.Fatal("创建索引失败:", err)
}
fmt.Println("索引创建完成!")
// 测试查询
var testIpInfo IpInfo
filter := bson.M{"start": bson.M{"$lte": ip}}
opts := options.FindOne().SetSort(bson.D{{"start", -1}})
err = coll.FindOne(context.TODO(), filter, opts).Decode(&testIpInfo)
if err != nil {
log.Printf("查询失败: %v", err)
return
}
if ip > testIpInfo.End {
log.Printf("超出结束区间")
return
}
log.Printf("%s 归属地为 %s", testIp, testIpInfo.Origin)
}
// 将ipv4字符串解析成uint32类型,非ipv4地址返回0
func parseIP(ipstr string) uint32 {
ip := net.ParseIP(ipstr)
if ip == nil {
// 解析ip地址失败
return 0
}
ip = ip.To4()
if ip == nil {
// 非ipv4 地址
return 0
}
return uint32(ip[0])<<24 | uint32(ip[1])<<16 | uint32(ip[2])<<8 | uint32(ip[3])
}
四、测试
在阿里云2核4G ESSD服务器(go程序和mongodb都在同一服务器上)
导入测试:

- CPU 使用率:80%左右
- 内存:稳定在 1.4GB
查询测试
一万次查询耗时 7.005735651s
评论区