-
Notifications
You must be signed in to change notification settings - Fork 36
/
mongo.go
141 lines (116 loc) · 3.34 KB
/
mongo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package mongo2
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/readpref"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type Result struct {
Id string `json:"_id" bson:"_id"`
Data []byte `json:"data" bson:"data"`
CreateTime time.Time `json:"create_time" bson:"create_time"`
}
type Client struct {
Uri string
DB string
Collection string
Expires int
cli *mongo.Client
}
/*
NewMongoClient
经测试mongo-driver会自动断线重连,且自带连接池,但貌似设置连接池大小没有作用,因此就不需要poolSize选项了
*/
func NewMongoClient(host, port, user, password, db, collection string, expires int) *Client {
var uri string
if user != "" {
uri = fmt.Sprintf("mongodb://%s:%s@%s:%s", user, password, host, port)
} else {
uri = fmt.Sprintf("mongodb://%s:%s", host, port)
}
client := Client{uri, db, collection, expires, nil}
err := client.Init()
if err != nil {
panic("YTask: init mongo error : " + err.Error())
}
return &client
}
// =======================
// high api
// =======================
func (c *Client) Get(key string) (Result, error) {
var result Result
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
col := c.GetCollection()
err := col.FindOne(ctx, bson.D{{"_id", key}}).Decode(&result)
// 由于mongo不是立即清理过期数据,所以这里需要判断是否过期
if err == nil && c.Expires > 0 && result.CreateTime.Add(time.Duration(c.Expires)*time.Second).Before(time.Now()) {
err = mongo.ErrNoDocuments
}
return result, err
}
func (c *Client) Set(key string, value []byte) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
col := c.GetCollection()
// 这里有个问题,如果doc在find之后过期,则ReplaceOne会报错。不过一般不用担心,key没那么容易重复
filter := bson.D{{"_id", key}}
err := col.FindOne(ctx, filter).Err()
if err == mongo.ErrNoDocuments {
_, err = col.InsertOne(ctx, Result{key, value, time.Now()})
return err
} else if err == nil {
_, err = col.ReplaceOne(ctx, filter, Result{key, value, time.Now()})
return err
}
return err
}
func (c *Client) GetClient(ctx context.Context) (*mongo.Client, error) {
client, err := mongo.Connect(ctx, options.Client().ApplyURI(c.Uri))
return client, err
}
func (c *Client) GetCollection() *mongo.Collection {
return c.cli.Database(c.DB).Collection(c.Collection)
}
func (c *Client) Init() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var err error
c.cli, err = c.GetClient(ctx)
if err != nil {
return err
}
err = c.cli.Ping(ctx, readpref.Primary())
if err != nil {
return err
}
col := c.GetCollection()
if c.Expires > 0 {
err = c.InitIndex(ctx, col.Indexes())
if err != nil {
return err
}
}
return err
}
func (c *Client) InitIndex(ctx context.Context, index mongo.IndexView) error {
cur, err := index.List(ctx)
if err != nil {
return err
}
// 索引为空则创建
if !cur.Next(ctx) {
indexOps := options.Index()
indexOps.SetExpireAfterSeconds(int32(c.Expires))
_, err = index.CreateOne(ctx, mongo.IndexModel{
Keys: bson.D{{"create_time", 1}},
Options: indexOps,
})
return err
}
return nil
}