在MongoDB里实现循环序列功能

ptv
在MongoDB⾥实现循环序列功能
环境是这样的:服务器是⽤Java做的, 数据库是MongoDB
需求是这样的:我们的系统⾥要⽣成⼀个唯⼀ID,前⾯的部分有⼀定的格式,并和时间关联, 精确到微秒,考虑到同⼀微秒内有可能存在并发情况, 所以后⾯在加两位序列号, 系统需要定义为1毫秒内的并发⼩于100个,所以后⾯两位就够⽤了。 Java服务器端有多台机器都可以⽤来⽣成这个唯⼀ID,所以需要在不同的机器上不能⽣成相同的序列号,所以需要在某⼀点上做全局的范围同步来保存这序列号的唯⼀性。 其实如果不考虑需求⾥的唯⼀ID是有⼀定意义的格式的, ⽤UUID或MongoDB的ObjectId都是更好的选择,完全不需要在某⼀点上进⾏同步,性能会更好。
这个可以⽣成序列号的点, 我们可以做⼀个序列号⽣成服务器来对应, 也可以⽤数据库来对应。 单单为这个简单的功能准备⼀个服务器来做显然不合适。 但是我们⽤的MongoDB并没有类似于MySQL或Oracle中的SELECT FOR UPDATE这样的锁机制。 所以没有办法简单的对这个序列号做原⼦操作。 但是MongoDB的对单个document进⾏update操作中有很是具有原⼦性的, 例如
$set
$unset
$inc
$push
$pushAll
$pull
$pullAll
我们可以利⽤这些原⼦操作,在数据库层以乐观锁的形式来实现循环序列字段。为了⽅便调⽤我把这段逻辑做成数据库中的Javascript函数。 类似与MySQL中的存储过程。
⾸先我们需要⼀个collection来存放序列号,并对需要的需要的序列号进⾏初始化。我们叫它counters。
然后我们想system.js⾥添加⼀个Javascript函数
db.system.js.save({_id:"getNextUniqueSeq",
value:function (keyName) {
var seqObj = db.counters.findOne({_id:keyName});
if (seqObj == null) {
print("can not find record with key: " + keyName);
return -1;
}
// the max value of sequence
var maxVal = seqObj.maxval;
// the current value of sequence
var curVal = seqObj.val;
while(true){
/
/ if curVal reach max, reset it
if(curVal >= maxVal){
var err = db.getLastErrorObj();
if( err && de ) {
print( "unexpected error reset data: " + tojson( err ) );
return -2;
} else if (err.n == 0){
// fail to reset value, may be reseted by others
print("fail to reset value: ");
}
/
/ get current value again.
seqObj = db.counters.findOne({_id:keyName});
maxVal = seqObj.maxval;
curVal = seqObj.val;
continue;
}
// if curVal not reach the max, inc it;
// increase
var err = db.getLastErrorObj();
if( err && de ) {
print( "unexpected error inc val: " + tojson( err ) );
return -3;
} else if (err.n == 0){
// fail to reset value, may be increased by others
gbomprint("fail to inc value: ");
// get current value again.
seqObj = db.counters.findOne({_id:keyName});
maxVal = seqObj.maxval;
curVal = seqObj.val;
continue;
} else {
var retVal = curVal + 1;
print("success to get seq : " + retVal);
// increase successful六自由度
return retVal;
}
}
}
});
上⾯这段会把指定的序列号的val值+1,如果val达到上限则从0开始。所以叫循环序列。
其实上⾯的实现在原理上和Java⾥的AtomicInteger系列的功能实现是类似的,利⽤循环重试和原⼦性的CAS来实现。这种实现⽅式在多线程的环境⾥由于锁(Monitor)的范围很⼩,所以并发性上⽐排他锁要好⼀些。
下⾯我们⽤Java来测试⼀下这个函数的正确性。 即在多线程的情况下会不会得到重复的序列号。
第⼀个测试,val=0, maxval=2000, Java端20个线程每个线程循环调⽤100次。 共2000次。 所以正确的情况下,从0到1999应该每个数字只出现⼀次。
@Test
public void testGetNextUniqueSeq1() throws Exception {
final int THREAD_COUNT = 20;
final int LOOP_COUNT = 100;
Mongo mongoClient = new Mongo("172.17.2.100", 27017);
DB db = DB("im");
db.authenticate("imadmin", "imadmin".toCharArray());
BasicDBObject q = new BasicDBObject();
q.put("_id", "UNIQUE_KEY");
BasicDBObject upd = new BasicDBObject();
BasicDBObject set = new BasicDBObject();
set.put("val", 0);
set.put("maxval", THREAD_COUNT * LOOP_COUNT);
upd.put("$set", set);
Thread[] threads = new Thread[THREAD_COUNT];
final int[][] results = new int[THREAD_COUNT][LOOP_COUNT];
马背上有一根for (int i = 0; i < THREAD_COUNT; i++) {
final int temp_i = i;
threads[i] = new Thread("" + i) {
@Override
public void run() {
try {
Mongo mongoClient = new Mongo("172.17.2.100", 27017);
DB db = DB("im");
db.authenticate("imadmin", "imadmin".toCharArray());
for (int j = 0; j < LOOP_COUNT; j++) {
Object result = db.eval("getNextUniqueSeq(\"UNIQUE_KEY\")");
System.out.printf("Thread %s, seq=%d\n", Thread.currentThread().getName(), ((Double) result).intValue());
results[temp_i][j] = ((Double) result).intValue();
}
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
};
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
for (int num = 1; num <= LOOP_COUNT * THREAD_COUNT; num++) {
// every number appear 1 times only!
int times = 0;
for (int j = 0; j < THREAD_COUNT; j++) {
for (int k = 0; k < LOOP_COUNT; k++) {
if (results[j][k] == num)
times++;
}
}
assertEquals(1, times);
}
}
然后我们再测试⼀下循环的情况。 val=0, maxval=99。 同样是Java端20个线程每个线程循环调⽤100次。 共2000次。这次从0到99的数字每个应该取得20次。
@Test
public void testGetNextUniqueSeq2() throws Exception {
final int THREAD_COUNT = 20;
final int LOOP_COUNT = 100;
Mongo mongoClient = new Mongo("172.17.2.100", 27017);
DB db = DB("im");
db.authenticate("imadmin", "imadmin".toCharArray());
BasicDBObject q = new BasicDBObject();
q.put("_id", "UNIQUE_KEY");
BasicDBObject upd = new BasicDBObject();
BasicDBObject set = new BasicDBObject();
set.put("val", 0);
set.put("maxval", LOOP_COUNT);
upd.put("$set", set);
Thread[] threads = new Thread[THREAD_COUNT];
final int[][] results = new int[THREAD_COUNT][LOOP_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
final int temp_i = i;
threads[i] = new Thread("" + i) {
@Override
蔡铁根public void run() {
try {
Mongo mongoClient = new Mongo("172.17.2.100", 27017);
DB db = DB("im");
db.authenticate("imadmin", "imadmin".toCharArray());
for (int j = 0; j < LOOP_COUNT; j++) {
Object result = db.eval("getNextUniqueSeq(\"UNIQUE_KEY\")");
System.out.printf("Thread %s, seq=%d\n", Thread.currentThread().getName(), ((Double) result).intValue());
results[temp_i][j] = ((Double) result).intValue();农村业余文化生活
}
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
};
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
for (int num = 1; num <= LOOP_COUNT; num++) {
// every number appear 20 times only!
int times = 0;
for (int j = 0; j < THREAD_COUNT; j++) {
for (int k = 0; k < LOOP_COUNT; k++) {
if (results[j][k] == num)
times++;
}
}
assertEquals(20, times);
}
}
这个测试跑了⼏次都是正确的。
由于没有可以进⾏对⽐其他的实现⽅式(例如排他锁)所以没有做性能测试。
写在最后。 虽然MongoDB⽀持类似于存储过程的Stored Javascript,但是其实不建议使⽤这个来解决复杂问题。主要原因是没法调试,维护起来太不⽅便。⽽且在2.4之前MongoDB对服务端Javascript⽀持并不是很好, ⼀个mongod进程同时只能执⾏⼀段Javascript。如果能在应⽤层解决掉还是在应⽤层⾥实现逻辑⽐较好。

本文发布于:2024-09-22 04:36:27,感谢您对本站的认可!

本文链接:https://www.17tex.com/xueshu/229293.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:序列号   循环   需要   实现   没有   情况   函数   功能
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议