若谷学院
互联网公司技术架构分享

一个因中断导致的死锁分析

最近在一次压测过程中暴露出notify client的一个死锁问题,发生死锁的场景是消息的可靠异步发送,具体过程是:

(生产者)消息发送线程拿到队列锁,当队列未满的时候写入消息,释放锁,当队列满的时候,释放锁,等待队列空条件。

(消费者)刷盘线程拿到队列锁,当队列有数据的时候,取数据清空队列,释放锁,再把取出来的消息数据刷盘持久化;没数据的时候,释放锁,等待队列非空条件。

这是一个典型的多生产者-单消费者的问题。起初我们通过review代码来看,都觉得不会发生死锁,因为在临界区域里面只用到了一把锁,不会出现deadly embrace类型的死锁。

后来进一步了解到用户对notify client的使用方式,发现他们的用法比较特殊,用户会把N ms之内没有完成消息发送的任务,强行cancel掉。也就是说生产者可能会在某个时刻检测到interrupt标记位,响应interrupt,是否会产生死锁必须把interrupt的因素也给考虑进去。

一般来说,当捕获到InterruptedException之后,比较规范的做法是把InterruptedException抛给上层调用者;或者调用Thread.currentThread().interrupt(),重新把线程中断标记置为true(因为阻塞方法在抛出InterruptedException,会清除中断标记位),暂不处理中断,把中断留给线程后续处理。基本原则就是要让任务能够优雅地在一个合适的时机响应中断,而不能对中断毫不作为。

在这个案例里面,生产者选择了后者,暂不处理,通过Thread.currentThread().interrupt()重新设置中断标记。

在大部分情况下,这么做是不会有问题的,但是在这种情况下,问题很大。因为在enqueue里面,会响应中断的代码是this.notEmpty.await(),并且是在一个循环里,this.notEmpty.await()会在方法入口处检测是否有中断标记,如果有那么就抛InterruptedException,这样一来一旦抛出第一个InterruptedException,在catch方法块里执行Thread.currentThread().interrupt(),会导致在下一次循环里继续抛出InterruptedException。如果运气好的话,可能在某个时刻this.nextWriteBatch != null条件不成立,跳出循环。如果运气不好的话,可能就是一个死循环。

在这次死锁案例中,是属于运气不好的情况,因为InterruptedException是在this.notEmpty.await()(Condition.await()会在执行过程中释放关联的锁)释放锁enqueueLock之前发生的,也就是说生产者在释放锁之前陷入中断循环。唯一能让this.nextWriteBatch != null不成立的线程是消费者线程,消费者线程没拿到锁,没机会执行this.nextWriteBatch = null。这样一来这个中断循环就成了死循环了,他永远不会释放锁,其他线程会一直阻塞等待锁。

锁定义,以下代码都在同一个实例里

private final Lock enqueueLock = new ReentrantLock();

private final Condition notEmpty = this.enqueueLock.newCondition();

private final Condition empty = this.enqueueLock.newCondition();

生产者代码

private WriteBatch enqueue(WriteCommand writeCommand, boolean sync) throws IOException {

WriteBatch result = **null**;

**this**.enqueueLock.lock();

**try** {

    // 如果没有启动,则先启动appender线程

    **this**.startAppendThreadIfNessary();

    **if** (**this**.nextWriteBatch == **null**) {

        result = **this**.newWriteBatch(writeCommand);

        **this**.empty.signalAll();

    }

    **else** {

        **if** (**this**.nextWriteBatch.canAppend(writeCommand)) {

            **this**.nextWriteBatch.append(writeCommand);

            result = **this**.nextWriteBatch;

        }

        **else** {

            **while** (**this**.nextWriteBatch != **null**) {

                **try** {

                    **this**.notEmpty.await();

                }

                **catch** (InterruptedException e) {

                    Thread._currentThread_().interrupt();

                }

            }

            result = **this**.newWriteBatch(writeCommand);

            **this**.empty.signalAll();

        }

    }

    **if** (!sync) {

        InflyWriteData inflyWriteData = **this**.inflyWrites.get(writeCommand.bytesKey);

        **switch** (writeCommand.opItem.op) {

        **case** OpItem._OP_ADD_:

            **if** (inflyWriteData == **null**) {

                **this**.inflyWrites.put(writeCommand.bytesKey, **new** InflyWriteData(writeCommand.data));

           }

            **else** {

                // update and increase reference count;

                inflyWriteData.data = writeCommand.data;

                inflyWriteData.count++;

            }

            **break**;

        **case** OpItem._OP_DEL_:

            // 无条件删除

            **if** (inflyWriteData != **null**) {

                **this**.inflyWrites.remove(writeCommand.bytesKey);

            }

        }

    }

    **return** result;

}

**finally** {

    **this**.enqueueLock.unlock();

}

}

消费者代码

**public** **void** processQueue() {

    **while** (**true**) {

        WriteBatch batch = **null**;

        **this**.enqueueLock.lock();

        **try** {

            **while** (**true**) {

                **if** (**this**.nextWriteBatch != **null**) {

                    batch = **this**.nextWriteBatch;

                    **this**.nextWriteBatch = **null**;

                    **break**;

                }

                **if** (**this**.shutdown) {

                    **return**;

                }

                **try** {

                    **this**.empty.await();

                }

                **catch** (InterruptedException e) {

                    **break**;

                }

            }

            **this**.notEmpty.signalAll();

        }

        **finally** {

            **this**.enqueueLock.unlock();

        }

        **if** (batch != **null**) {

            **final** DataFile dataFile = batch.dataFile;

            **final** LogFile logFile = batch.logFile;

            **final** List<WriteCommand> cmdList = batch.cmdList;

            **try** {

                **this**.writeDataAndLog(batch, dataFile, logFile, cmdList);

                **this**.processRemove(batch, dataFile, logFile);

            }

            **finally** {

                batch.latch.countDown();

            }

        }

    }

}

 

其实在java技术规范里面,并不推崇,也不提供简单粗暴的任务中断机制,强制要求一个任务立马停止。因为如果一个任务在执行过程中,被非正常取消的话,有可能会导致数据结构被破坏,数据不一致的情况发生。所以java推崇的是通过协作的方式来终止一个任务,一个线程可以向另外一个线程发起终止信号,但是具体如何终止,应该由被终止的线程来决定。被终止的线程可以通过检查终止信号,在一个合适的时机优雅退出。

比如一个任务可以设置一个volatile的cancel标记,当要终止这个任务的时候,我们把cancel标记设置为true,告诉任务我们要终止了。任务在某个时候检查到这个终止标记,知道要终止的,把数据结构维护好,在合适的时机退出。

尽管可以通过设置用户自定义的cancel标记来取消任务,但是也有可能任务调用了一些阻塞方法,比如Condition.await(),一旦阻塞可能就没机会去检测用户自定义的cancel标记,这样一来任务也就不能及时响应用户的取消操作。因此jdk提供了线程内置的interrupt标记,可以通过Thread.currentThread().interrupt()来设置,并且大部分的类库阻塞方法都能会检查这个中断标记位,在中断的时候抛出异常,以便让任务及时响应用户中断。

当然也有一种可能是任务没有使用自定义cancel标记,也没有调用能够抛出InterruptedException的方法,如果对这类任务调用Thread.currentThread().interrupt(),是不会产生预期效果的。因此调用方不应该随意调用Thread.currentThread().interrupt()来取消任务,除非他知道这个任务的中断策略。而作为任务代码编写者,要保证程序健壮,应该考虑一个合适的中断策略,能够在被中断的时候,尽可能及时响应中断,优雅的退出。

重新回到上面的例子,结合业务场景,当用户调用中断的时候,是想取消发送消息任务。任务代码在this.notEmpty.await()检查到线程中断标记,抛出InterruptedException。因为数据结构还没有被破坏,数据状态是一致的,所以无需捕获异常,直接往上层抛出InterruptedException,释放锁,以消息发送失败告终;

private WriteBatch enqueue(WriteCommand writeCommand, boolean sync) throws IOException, InterruptedException {

WriteBatch result = **null**;

**this**.enqueueLock.lock();

**try** {

    // 如果没有启动,则先启动appender线程

    **this**.startAppendThreadIfNessary();

    **if** (**this**.nextWriteBatch == **null**) {

        result = **this**.newWriteBatch(writeCommand);

        **this**.empty.signalAll();

    }

    **else** {

        **if** (**this**.nextWriteBatch.canAppend(writeCommand)) {

            **this**.nextWriteBatch.append(writeCommand);

            result = **this**.nextWriteBatch;

        }

        **else** {

            **while** (**this**.nextWriteBatch != **null**) {

                    **this**.notEmpty.await();

            }

            result = **this**.newWriteBatch(writeCommand);

            **this**.empty.signalAll();

        }

    }

    **if** (!sync) {

        InflyWriteData inflyWriteData = **this**.inflyWrites.get(writeCommand.bytesKey);

        **switch** (writeCommand.opItem.op) {

        **case** OpItem._OP_ADD_:

            **if** (inflyWriteData == **null**) {

                **this**.inflyWrites.put(writeCommand.bytesKey, **new** InflyWriteData(writeCommand.data));

            }

            **else** {

                // update and increase reference count;

                inflyWriteData.data = writeCommand.data;

                inflyWriteData.count++;

            }

            **break**;

        **case** OpItem._OP_DEL_:

            // 无条件删除

            **if** (inflyWriteData != **null**) {

                **this**.inflyWrites.remove(writeCommand.bytesKey);

            }

        }

    }

    **return** result;

}

**finally** {

    **this**.enqueueLock.unlock();

}

}

如果在抛出中断异常的时候,数据结构处于不一致的状态,那么可以先把中断状态保存下来,等数据结构维护好之后再退出。比如下面的任务,子任务A和子任务B必须都执行才能保证数据结构不被破坏。

策略1: process声明中断异常,上层调用者必须考虑如何处理中断异常。Process在执行完A之后被中断,直接撤销A,抛出中断异常,让调用者来处理中断。

**public** **void** process() **throws** InterruptedException{

          doA();

          **try** {

                 Thread._sleep_(1000);

          } **catch** (InterruptedException e) {

                 undoA();

                 **throw** e;

          }

          doB();

   }

策略2:process没有声明中断异常,当中断发生时,选择一不做二不休的策略。先保存中断状态,把A、B都做完了。然后重新标记中断,把中断信号传递下去,让线程后面的逻辑来响应中断。

**public** **void** process() {

          **boolean** cancel=**false**;

          doA();

          **try** {

                 Thread._sleep_(1000);

          } **catch** (InterruptedException e) {

                 cancel=**true**;

          }

          doB();

          **if** (cancel) {

                 Thread._currentThread_().interrupt();

          }

   }

原文出自:http://jm.taobao.org/archives/page/13/

好烂呀没啥价值凑合看看还不错很精彩 (还没有人评分)
Loading...
本站文章来自互联网一线技术博客,若有侵权,请联系我们:若谷技术学院 » 一个因中断导致的死锁分析
关注若谷技术,获得个性化即时架构文章推送

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

全球互联网技术架构,前沿架构参考

联系我们博客/网站内容提交