目 录CONTENT

文章目录
Go

go实现千万条文本ip数据导入数据库并查询

过客
2025-12-22 / 0 评论 / 2 点赞 / 66 阅读 / 0 字

前段时间刷短视频上说,如果面试官问你:“有一亿条文本文件数据(如:csv)导入到数据库,你会用什么办法?”我的第一想法是直接写个程序,逐行读取,批次插入,也就几十行的代码搞定了。

今天视频照进现实了,一个小伙伴问我,他们要ip地址查询归属地,购买了离线数据包数据量上千万条,格式类似csv,要导入mongodb数据库(游戏项目,用这个数据库的比较多,主打一个灵活,能快速应对脑洞大开的策划时不时的搞活动加属性等)。

一、数据格式

IP段起始 IP段结束 归属地 网络
1.0.0.0 1.0.0.255 泛播 Cloudflare 数据中心
1.0.1.0 1.0.3.255 中国福建省福州市 电信
1.0.4.0 1.0.7.255 澳大利亚维多利亚墨尔本
IP段起始,IP段结束,归属地,网络
1.0.0.0,1.0.0.255,泛播,Cloudflare 数据中心
1.0.1.0,1.0.3.255,中国福建省福州市,电信
1.0.4.0,1.0.7.255,澳大利亚维多利亚墨尔本,
...

二、设计思路

1. 数据分析

首先可以将ipv4地址转成uint32类型,每条的ip地址区间是不重叠的。MongoDB 本身不支持“区间索引”或“范围包含查询”的最优结构(不像 PostgreSQL 的 range 类型 + GiST 索引),但可以通过合理建模 + 索引优化实现高效查询。只创建开始IP的索引,使用$lt + 排序,逻辑层再判断 ip值小于结束地址,就能实现指数性能的时间查询。

findOne({start ≤ x}, sort: start:-1) + 应用层校验

2. 数据插入

数据量有点大,可以逐行读取,批量插入,每次插入 1000~10000 条,减少网络往返,等导入完数据后,再创建索引,这样会大幅度提升写入速度。

三、 Go 代码实现

package main

import (
	"bufio"
	"context"
	"fmt"
	"log"
	"net"
	"os"
	"strings"
	"time"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)

const batchSize = 5000 // 每批插入数量(建议 1k~10k)

type IpInfo struct {
	Start  uint32 `bson:"start"`  // 开始地址字节
	End    uint32 `bson:"end"`    // 结束地址字节
	Origin string `bson:"origin"` // 归属地
	Isp    string `bson:"isp"`    // 运营商
}

func main() {
	// 1. 连接 MongoDB
	client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI("mongodb://root:123456@localhost:27017"))
	if err != nil {
		log.Fatal(err)
	}
	defer client.Disconnect(context.TODO())

	coll := client.Database("ip").Collection("local")

	// 2. 清空旧数据(可选)
	_, _ = coll.DeleteMany(context.TODO(), bson.D{})

	// 3. 打开文件
	file, err := os.Open("data.csv")
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	scanner := bufio.NewScanner(file)
	var batch []mongo.WriteModel
	count := 0
	startTime := time.Now()

	for scanner.Scan() {
		line := strings.TrimSpace(scanner.Text())
		if line == "" {
			continue
		}

		parts := strings.Split(line, ",")
		if len(parts) < 4 {
			log.Printf("跳过无效行: %s", line)
			continue
		}

		start := parseIP(parts[0])
		end := parseIP(parts[1])
		if start == 0 || end == 0 {
			log.Printf("解析失败IP: %s", line)
			continue
		}

		doc := &IpInfo{
			Start:  start,
			End:    end,
			Origin: parts[2],
			Isp:    parts[3],
		}

		batch = append(batch, mongo.NewInsertOneModel().SetDocument(doc))
		count++

		// 达到批次大小,执行批量插入
		if len(batch) >= batchSize {
			_, err := coll.BulkWrite(context.TODO(), batch, options.BulkWrite().SetOrdered(false))
			if err != nil {
				log.Fatalf("批量插入失败: %v", err)
			}
			fmt.Printf("已导入 %d 行\n", count)
			batch = batch[:0] // 重用切片,避免频繁分配
		}
	}

	// 插入剩余数据
	if len(batch) > 0 {
		_, err := coll.BulkWrite(context.TODO(), batch, options.BulkWrite().SetOrdered(false))
		if err != nil {
			log.Fatalf("最后一批插入失败: %v", err)
		}
	}

	elapsed := time.Since(startTime)
	fmt.Printf("总共导入 %d 行,耗时: %v\n", count, elapsed)

	// 4. 导入完成后创建索引
	fmt.Println("正在创建索引...")
	_, err = coll.Indexes().CreateOne(context.TODO(), mongo.IndexModel{
		Keys:    bson.D{{"start", 1}},
		Options: options.Index(), // 默认非唯一,无需额外设置
	})
	if err != nil {
		log.Fatal("创建索引失败:", err)
	}
	fmt.Println("索引创建完成!")

    // 测试查询
    var testIpInfo IpInfo
	filter := bson.M{"start": bson.M{"$lte": ip}}
	opts := options.FindOne().SetSort(bson.D{{"start", -1}})
	err = coll.FindOne(context.TODO(), filter, opts).Decode(&testIpInfo)
	if err != nil {
		log.Printf("查询失败: %v", err)
		return
	}

	if ip > testIpInfo.End {
		log.Printf("超出结束区间")
		return
	}

	log.Printf("%s 归属地为 %s", testIp, testIpInfo.Origin)
}

// 将ipv4字符串解析成uint32类型,非ipv4地址返回0
func parseIP(ipstr string) uint32 {
	ip := net.ParseIP(ipstr)
	if ip == nil {
		// 解析ip地址失败
		return 0
	}

	ip = ip.To4()
	if ip == nil {
		// 非ipv4 地址
		return 0
	}

	return uint32(ip[0])<<24 | uint32(ip[1])<<16 | uint32(ip[2])<<8 | uint32(ip[3])
}

四、测试

在阿里云2核4G ESSD服务器(go程序和mongodb都在同一服务器上)

导入测试:

  • CPU 使用率:80%左右
  • 内存:稳定在 1.4GB

查询测试

一万次查询耗时 7.005735651s

2
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区