我的知识库

知识等于力量

« 当Maven遇上MyEclipse一幅图总结出外国人和中国人的区别 »

使用异步Servlet处理挂起线程

摘要

  BEA WebLogic Server 9.2及以上版本将公开一个Abstract Asynchronous Servlet类,可用于解除接收servlet请求与发送其响应之间的耦合。该类还提供了一个Future Response Servlet,用于支持服务器使用一个不同的线程(而不是处理传入请求的线程)来处理servlet响应。传统servlet模型的这两个扩展都可以避 免挂起线程并将长时间运行的作业与servlet范例集成在一起。本文将介绍这两个特性,同时将提供一些示例。

简介

  是否希望通过在Web应用程序中控制响应时间来实现服务质量(QoS)?从最简单的形式来说,服务质量需求就是将响应时间控制在特定的时间(秒)内。如果无法满足这一需求,则会提供一条有意义的错误消息。

  传统的servlet线程模式相当简单:应用服务器分配特定数量的线程,并提供给Web应用程序使用(参见图1)。当新请求传入并可供服务使用时,应用程序将分配一个线程。从此之后,servlet线程将一直占用内存池,直到它完成所有任务。

  Traditional servlet threading model

  图1. 传统servlet线程模式

  如果servlet需要运行一个长时间的任务,则会造成一些问题。 要解决这一问题,最常用的方法是使用JMS或Message Driven Bean解除Web请求与长时间运行后台进程之间的耦合。这样便可解决不需要将处理结果立即返回响应的情况。

  这听上去像是一个“Fire-and-forget”场景。但是,如果需要向客户机返回一些数据,那么servlet线程极有可能会成为挂起线程。

问题

  在大多数情况下,挂起线程的意思就是在联系后台系统获取发送请求所需要的数据时受到了阻塞。其典型场景就是通过JDBC连接远程数据库。

  问题的关键在于,如果所需资源速度变慢或完全不可用,则需要确保应用程序知道如何处理这一情况。大多数情况下,这并不是什么问题,因为超时套接字到远程数据库服务的代码都由数据源和JDBC驱动程序处理。但是,在需要手动编写超时策略时,该场景将变得极为复杂。

  许多程序员都害怕处理网络超时。最常见的问题是,将没有超时支持的单线程网络客户机扩展为复杂的多线程时,每个单独的线程都需要测试网络超时,并且阻塞线程与主应用程序之间需要某种形式的通知流程。

  本文将介绍如何使用BEA WebLogic Server的未来响应模型(Future Response Model)来编写能够有效处理超时的Java Web应用程序。这一过程非常简单。通过解除响应与传入请求及超时无响应请求之间的耦合,该模型还可以防止挂起线程。为避免这种线程挂起场景, WebLogic Server提供了两个类专门用于异步处理HTTP请求,其原理是解除响应与处理传入请求的线程之间的耦合。以下部分将详细介绍这两个类。

抽象异步Servlet类

  AbstractAsyncServlet类的实现将解除接收servlet请求与发送响应之间的耦合。我们实现Abstract Asynchronous Servlet类的方法是扩展 weblogic.servlet.http.AbstractAsyncServlet 类。以下是需要实现的方法:

  • public boolean doRequest(RequestResponseKey rrk):该方法是联系servlet的初始点。它接受的输入参数为RequestResponseKey类,RequestResponseKey类是 传统servlet请求的包装器。下文将会介绍,我们可以使用请求判断servlet是否响应。
  • public void doResponse (RequestResponseKey rrk, Object context):该方法将处理servlet响应。正如本文所述,初始线程并不需要处理此方法。
  • public void doTimeout (RequestResponseKey rrk):如果未在特定时间段内发送servlet响应,则服务器将会触发doTimeout()方法。

  Servlet类还提供了一个静态方法:

  • static void notify(RequestResponseKey rrk, Object context):调用此方法将通知服务器应该向键rrk发送一个响应。

  只实现抽象类还不足以实现解耦。解除请求与响应之间的耦合可以通过doRequest()方法中的 TimerListener 类来实现。可以使用工厂模式获得TimerListener类的一个新实例(实际上是通过TimerManagerFactory)。 TimerListener线程可以针对AbstractAsyncServlet实例调用notify()方法,从而最终触发响应的发送。以下示例将演 示如何创建这种计时器:

 TimerManagerFactory.getTimerManagerFactory().
     getDefaultTimerManager().schedule(new TimerListener() {
       public void timerExpired(weblogic.timers.Timer arg0) {
         try {
           // This will trigger delivery of the response.
          AbstractAsyncServlet.notify(rrk, null);
         }
         catch (Exception e)
        {
         e.printStackTrace();
        }
     }
}, 1000);

  此处的关键在于,如果该类位于doRequest()方法中,则需要等到计时器到期并调用notify()方法时才会发送响应。反过来, notify()将调用doResponse()方法。如果未在指定时间内调用notify()方法,则会调用doTimeout()方法。

  图2展示了客户机与服务器之间的交互流程图。

  Request/response decoupling using AbstractAsyncServlet

  图2.使用AbstractAsyncServlet实现请求/响应解耦

  我们来看看该模式的典型使用。其中一个场景为,服务质量应用程序需要在特定时间内向客户机发送一个响应消息(正如前面如述)。但是,即便是不同 的用例(如延时的请求处理)也可以为我们带来好处:试想某个应用程序需要访问远程资源(比如说远程DB链接)或CPU密集算法。在此类场景中,延时处理请 求(固定时间量)可以帮助防止过多同时访问。到目前为此,惟一的选择便是等待使用有价值的线程。

  下面将演示该方法的应用示例。

Abstract Asynchronous Servlet的应用示例

  假设,某个应用程序的作用是输出股票交易报价。我们将使用JCA接口获取股票价值,而该接口用于访问某个银行系统。用户需要在几秒钟之内获取反馈。但是,如果数千名用户在同一时间段时连接系统,则响应极有可能会延时,并且最坏的情况将导致HTTP连接超时。

  不过所造成的损害远不止于此:如果用户发现该站点无法响应,则会单击刷新按钮,大量的新请求将会淹没应用服务器。

  需要采取相同的措施来制止这一问题。如果后台资源受到阻塞,则无法满足用户的请求。但是,如果在收集最新的股票代码时就将其存入缓存,那么就可 以在超时回调之前生成此信息,同时指出数据采集的时间。在本例中,我们将超时设置为2500毫秒。如果在此时间内未生成响应,则会调用doTimeout ()方法。图3显示了其体系结构。

  doTimeout callback

  图3. 未及时通知servlet时将会触发doTimeout()回调

  此servlet的详细代码如下所示:

public class AsyncServlet extends AbstractAsyncServlet {

   // Need JDK 1.5
   Hashtable  <String,Long> hashStocks = new Hashtable<String,Long>();
   Date dateLastUpdate = new Date();

   public void init() {
   // The default request timeout period in milliseconds.
         super.setTimeout(2500);
         // The default interval period in milliseconds at which
        // requests will be checked for timeouts.
         super.setScavangeInterval(1);
   // Populate hashtable with Stock initial value
         hashStocks = readStocksValue();
   }

  public boolean doRequest(final RequestResponseKey rrk)
         throws ServletException, IOException {
 
     HttpServletRequest req = rrk.getRequest();
     HttpServletResponse res = rrk.getResponse();
 
     // Read Stock quotation from Back end
     hashStocks = readStocksValue();
 
     TimerManagerFactory.getTimerManagerFactory().getDefaultTimerManager()
                  .schedule(new TimerListener() {
 
        public void timerExpired(weblogic.timers.Timer arg0) {
            try {     
             // This will trigger delivery of the response.
             AbstractAsyncServlet.notify(rrk, null);
             } catch (Exception e) {
          e.printStackTrace();
             }     
        }     
      }, 1000);
 
     return true; 
  }

  public void doResponse(RequestResponseKey rrk, Object context)
                          throws ServletException, IOException {

         HttpServletRequest req = rrk.getRequest();
         HttpServletResponse res = rrk.getResponse();

         // Display fresh quotations
         displayStockQuotations(res);
  }

  public void doTimeout(RequestResponseKey rrk) throws ServletException,
                  IOException {
         HttpServletRequest req = rrk.getRequest();
         HttpServletResponse res = rrk.getResponse();
 
         // Hung thread. Recover quotations from the cache
         displayStockQuotations(res);
  }
 
  private Hashtable<String,Long> readStocksValue() { 
         Hashtable<String,Long> hash = new Hashtable<String,Long>();
 
         hash.put("XOM", new Long(75));
         hash.put("GE", new Long(1005));
         hash.put("JPM", new Long(1008));
         dateLastUpdate = new Date();
 
         return hash;
  }
 
  private void displayStockQuotations(HttpServletResponse res) {
         PrintWriter pw = null;
         try {
            pw = res.getWriter();
         } catch (IOException e) { 
            e.printStackTrace();
         }
         pw.println("Quotations at :" +dateLastUpdate);
         Enumeration<Long> enumStocks = hashStocks.elements();
         while (enumStocks.hasMoreElements()) {
            pw.println(enumStocks.nextElement());
            pw.flush();
         }
         pw.close();
   }
}

  这种方法的主要好处在于,我们可以在提供应答的同时保持线程队列的执行效率。这有利于实现伸缩性较好的应用程序,并改进用户体验。

  这种方法刻意保持了简单性;毕竟,本文的实际目的只是通过此示例演示一个特性。

  在实际的股票系统中,我们需要添加一些复杂性,即根据用户的类型对响应消息进行自定义。比如说,与股票系统订定合约的用户或股票代理可能需要较长的超时或完全没有超时。对于所有其他用户,交付报价的合理延时需要维持在几秒钟的范围之内。

Javascript Hacking

  Ajax应用程序的出现极大地改变了Web应用程序的样式。使用Ajax构建的应用程序可生成内容更加丰富且更具动态性的Web站点,比如说 Google Maps、MySpace、Gmail和Netflix站点。此类站点需要在底层执行大量工作,因此用户只需填写一些表单,而应用程序会自动将必要的信息 提供给用户。

  遗憾的是,公共Ajax最佳实践尚未开发,因此开发过程中比较容易出现各类问题。

  问题在于,Web 2.0站点比Web 1.0站点更容易受到攻击:每次按键或鼠标单击操作都会转换为使用中的执行线程。因此,考虑到Ajax应用程序的这种特性,一些糟糕的机制所生产的短期 HTTP请求可能会耗尽WebLogic Server的资源。并且一些恶意已经开始利用这一问题也就不足为奇了(因为只需几行代码便可实现)。在与系统管理员的对抗中,他们甚至将此类攻击称作 Javascript hacking。

Future Servlet to the Rescue

  WebLogic Server 6.1中已经有了未来servlet的身影,不过此功能一直都是“隐藏”在WebLogic Server中的。到WebLogic Server 9.1发行时,BEA才最终公开了这一模型。因此,我们将了解一下这个未来servlet模型究竟有何帮助。要使用该servlet,需要实现 weblogic.servlet.FutureResponseServlet 类。

  我们不再需要实现doRequest()、doResponse()和doTimeout()方法,惟一要做的就是定义一个处理servlet请求的服务:

 public void service(HttpServletRequest req, FutureServletResponse rsp)

  如果决定提供即时应答,只需要对FutureServletResponse实例调用send方法。但是,如果希望在以后延缓响应,则需要提供一个 TimerTask 类的扩展。下面给出了示例代码:

     
Timer timer = new Timer();     
ScheduledTask mt = new ScheduledTask(rsp, timer);
myTimer.schedule(mt, 100);
 
......
 
public class ScheduledTask extends TimerTask {   
  private FutureServletResponse rsp;   
  Timer timer;                                          
  
  ScheduledTask(FutureServletResponse rsp, Timer timer){      
    this.rsp = rsp;      
    this.timer = timer;    
  }    
  
  public void run() {
   try {        
      PrintWriter out = rsp.getWriter();        
      out.println("This is a delayed answer");        
      rsp.send();        
      timer.cancel();      
   } catch(IOException e) {
      e.printStackTrace();      
   }    
  }

  但是,如何才能在实际用例中利用这个类呢?我们再次回到Ajax范型。当新请求传入时,如果遵循传统模型,则应该向客户机提供一个即时响应并继 续等待新请求。但是此处就是关键:如果将客户机请求存入缓冲区,并立即回复客户机,那么将在不同的线程中异步处理请求,且仅当后台进程终止时才发送响应。

  换句话说,我们不再需要客户机-服务器轮询,而只处理服务器请求并在处理终止时通知客户机。解除请求与响应之间的耦合可以防止执行过于频繁,从而使应用程序更易于伸缩。还可以用于避免被Ajax请求淹没

  但是,客户如何才能知道服务器-客户机何时终止呢?当我们处理事件驱动型编程时,需要使用观察者模式(Observer Pattern),如图4所示。

  如果对于此模式还不太熟悉,我们给出了其简短描述。该模式需要依赖三个角色:Subject、Observer和Concrete Observer。

  Observer pattern

  图4. 观察者模式

  • Subject将提供一个接口,用于附加和分离Observer。
  • Observer将为所有定义一个更新接口,用于从Subject处接收更新通知。
  • Concrete Observer将维护与Subject的一个引用,这样便可在接收到通知时接收Subject的状态。最终将执行它所包含的函数。

  以下是该方法的应用示例:

public class FutureServlet extends FutureResponseServlet {

long time = 100;
public void service(HttpServletRequest req, FutureServletResponse rsp)   
                                       throws IOException, ServletException {
   PrintWriter pw=null;
   try {
     pw = rsp.getWriter();
   } catch (IOException e) {
     e.printStackTrace();
   }
   pw.println("Request arrived");
   pw.flush();
   new Subject(req,rsp);

}

class Subject extends Observable {
private static Stack <FutureServletResponse>
                 stackResponses = new Stack <FutureServletResponse>();

public Subject (HttpServletRequest req, FutureServletResponse rsp){

  PrintWriter pw=null;
  try {
    pw = rsp.getWriter();
  } catch (IOException e) {
    e.printStackTrace();
  }
  pw.println("Request sent to server");
  pw.flush();

  addObserver(new ConcreteObserver());
  stackResponses.push(rsp);
  startListening();
}

public void startListening() {
  Thread t = new Thread() {
   public void run() {
    while (true) {
     try {
      Thread.sleep(1000);
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
     if (stackResponses.isEmpty()) {
      continue;
     }
     FutureServletResponse rsp =  stackResponses.pop();

     setChanged();
                    // Notify event to Observer attached
     notifyObservers(rsp);
    }
   }
  };
  t.start();
}

class ConcreteObserver implements Observer {
  public ConcreteObserver() {}

  public void update(Observable o, Object arg) {
   FutureServletResponse rsp = (FutureServletResponse) arg;
   PrintWriter pw=null;
   try {
     pw = rsp.getWriter();
     pw.println("Response sent at :" + new Date() + "

   ");
     pw.flush();
     rsp.send();
     pw.close();
   } catch (IOException e) {
    e.printStackTrace();
   }
  }
}
}

  如您所见,当接收到新请求时,请求和响应将存储在一个栈中。然后,Subject类将注册为ConcreteObserver类的Observer,用于保存需要在发生通知时执行业务函数。

  通过这种简单而有效的方法,我们可以使Ajax应用程序更具伸缩性,从而降低执行线程的影响。该线程现在已经可以同时处理较多的请求了。

  据BEA称,“此模型可为响应的处理方式提供全面的控制,并且可在线程处理方面提供更多的控制。但是,使用该类来避免挂起线程需要开发人员自己提供大部分代码。”他们建议在大多数情况下使用Abstract Asynchronous Servlet。

结束语

  在理想的测试条件下,并不会出现网络超时并且处理超时的工作几乎可以忽略不计。但是,当到达服务的请求开始增加时,设计较差的Web应用程序可能会增加服务的负担并最终造成网络客户无限制阻塞。

  考虑到这些原因,防止挂起线程的扩散是非常有必要的。Abstract Asynchronous Servlet可以解除响应与传入请求和超时无响应请求之间的耦合。

  但是,如果需要将响应调整到未来的特定时间,并且需要全面控制线程处理,那么可以采用Future Response Servlet接口。

参考资料

原文出处:http://dev2dev.bea.com/pub/a/2007/12/asynchronous-servlets.html

 作者简介
  Francesco Marchioni 于1997年加入Java社区,是一名经过认证的Sun企业架构师。他是Pride SpA的雇员,为BEA客户设计和开发了许多在Weblogic Platform上使用的J2EE应用程序。
dot dot dot

dot
  作者其它文章

Search

导航

热门文章

最新文章

Powered By duduwolf's wiki 1.0

Copyright 1999-2007 duduwolf.com Some Rights Reserved.