Paxos用来确定一个不可变量的取值,一旦确定将不再更改(不可变性),并且可以获取到(可读取性)。
一个分布式存储系统,可以存储名为var的变量
系统需要保证var取值满足一致性
系统需要满足容错特性
使用单个Acceptor,通过类似互斥锁机制管理并发Proposer
var accepted_value = null
val lock = new Lock()
def prepare(): (Boolean, Object) = {
(lock.tryLock(), acepted_value)
}
def release(): Unit = {
lock.release()
}
def accept(value: Object): Object = {
if(accepted_value == null) {
accepted_value = value
}
release()
accepted_value
}
def propose(value: Object): Object = {
val (res, accepted_value) = prepare()
if(accepted_value != null) {
release()
accepted_value
} else {
if(!res) {
Thread.sleep(1000)
propose(value)
} else {
accept(value)
}
}
}
获得锁的Proposer机器故障会导致死锁
引入抢占式访问权
Proposer向Acceptor申请访问权时指定编号epoch,获得访问权后才能向Acceptor提交取值
Acceptor采用喜新厌旧原则
为了保持一致性
var last_prepare_epoch: Integer = null
var accepted_epoch: Integer = null
var accepted_value: Object = null
val lock = new Lock()
def prepare(epoch: Integer): (Boolean, Integer, Object) = {
lock.lock()
var res = false
if(last_prepare_epoch == null || epoch > last_prepare_epoch) {
last_prepare_epoch = epoch
res = true
}
lock.release()
(res, accepted_epoch, accepted_value)
}
def accept(epoch: Integer, value: Object): (Boolean, Integer, Object) = {
lock.lock()
var res = false
if(epoch == last_prepare_epoch && accepted_value == null) {
accepted_value = value
accepted_epoch = epoch
res = true
}
lock.release()
(res, accepted_epoch, accepted_value)
}
def propose(value: Object): Object {
doPropose(value, 1)
}
private def doPropose(value: Object, epoch: Integer): Object = {
val (res, accepted_epoch, accepted_value) = prepare(epoch, value))
if(accepted_value != null) {
accepted_value
} else {
if(!res) {
Thread.sleep(1000)
propose(value, accepted_epoch + 1)
} else {
doAccept(value, epoch)
}
}
private def doAccept(value: Object, epoch: Integer): Object = {
val (res, accepted_epoch, accepted_value) = accept(epoch, value)
if(accepted_value != null) {
accepted_value
} else {
Thread.sleep(1000)
doPropose(value, accepted_epoch + 1)
}
}
Acceptor机器单点
引入多个Acceptor
少数服从多数原则
某个epoch的取值f被半数以上Acceptor接受,则此var值被确定为f
Propose
同方案二
def acceptors: List[Acceptor]
def numOfAcceptors: Integer = acceptors.size
def propose(value: Object): Object {
doPropose(value, 1)
}
private def doPropose(value: Object, epoch: Integer): Object = {
val prepareResults = acceptors.map(_.prepare(epoch, value)).filter(返回结果的acceptor)
if(prepareResults.size * 2 > numOfAcceptors) {
if(prepareResults.exists(_._3 != null) {
val (_, maxEpoch, maybeValue) = prepareResults.filter(_._3 != null).sort(_._2)
val numOfMaxEpochValue = prepareResults.filter(_._3 == maybeValue)
if(numOfMaxEpochValue * 2 > numOfAcceptors) {
doAccept(maybeValue, epoch)
} else {
Thread(1000)
doPropose(maxEpochValue, epoch + 1)
}
} else {
doAccept(value, epoch)
}
} else {
Thread.sleep(1000)
doPropose(value, epoch + 1)
}
}
private def doAccept(value: Object, epoch: Integer): Object = {
val acceptResults = acceptors.map(_.accept(value, epoch)).filter(成功返回结果的acceptor)
if(acceptResults.size * 2 > numOfAcceptors) {
val (_, maxEpoch, maybeValue) = acceptResults.filter(_._3 != null).sort(_._2).head
val numOfMaxEpochValue = acceptResults.filter(_._3 == maybeValue)
if(numOfMaxEpochValue * 2 >= numOfAcceptors) {
maybeValue
} else {
Thread.sleep(1000)
doPropose(maybeValue, maxEpoch + 1)
}
} else {
Thread.sleep(1000)
doPropose(value, epoch + 1)
}
}
半数以上Acceptor的值是一样的
第一阶段获取修改访问权,第二阶段实施修改操作
不会,要进入第二阶段必须获得半数以上Acceptor的修改访问权,两个半数以上Accptor集合必然存在交集
第二阶段提交后,返回的结果中半数以上是自己提交的值
返回都为空,说明所有Acceptor都没有被设值,当旧的epoch尝试设值的时候,Acceptor会拒绝,因为last_prepare_epoch > 所有旧的epoch
propose时无法获得半数以上Acceptor同意,根据不同的返回结果做相应的处理:
如果已经达成了确定性取值,第一阶段获得半数以上Acceptor修改权限,并且其中必然存在一个Acceptor返回了该确定性取值,Proposer会抛弃自己的值,而努力使该值成为确定性取值
如果必须挑选一个的话,选accepted_epoch最大的,从概率上会更容易产生确定性取值
在正常的Acceptor中必然存在一个Acceptor的取值是出现故障前的确定性取值f,同时该Acceptor的accepted_epoch是最大的,当propose在第一阶段是,会获得该值f,并努力使f成为确定性取值
如果存在Acceptor已经被赋值,那么新的Proposer会努力使最大epoch的值成为确定性取值,如果Acceptor都没有被赋值,Proposer会努力使自己的取值成为确定性取值