Skip to content

《Java并发编程实战》7.取消与关闭(中) 停止基于线程的服务 #34

@funnycoding

Description

@funnycoding

应用程序通常会创建拥有多个线程的服务,例如线程池。

并且这些服务的生命周期通常比创建它们的方法的生命周期更长。 如果应用程序准备退出,那么这些服务所拥有的线程也需要结束。

由于无法通过抢占式的方法来停止线程,因此它们需要「自行结束」。

正确的封装原则是:除非拥有某个线程,否则不能对线程进行操控。 例如:中断线程或者修改线程的优先级等。

在线程 API 中,并没有对线程的所有权给出正式的定义:线程由 Thread 对象表示,并且像其他对象一样可以被自由共享。 然而线程有一个相应的所有者,即创建该线程的类。 因此线程池是其工作者线程的所有者<---工作线程在线程池内被创建,如果要中断这些线程,那么应该使用线程池。【<---通过线程池来做中断,而不是直接通过工作线程本身来中断?】

与其他封装对象一样,线程的所有权是不可传递的:应用程序可以拥有服务,服务也可以拥有工作线程,但应用程序并不能拥有工作者线程,因此应用程序不能直接停止工作者线程

相反,服务应该提供**生命周期方法(Lifecycle Method)**来关闭它自己以及它所拥有的线程。 这样,当应用程序关闭该服务时,服务就可以关闭所有的线程了。

ExecutorService 中提供了 shutdonshutdownNot 等方法。 同样,在其他拥有线程的服务中,也应该提供类似的关闭机制。

对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。

7.2.1 示例:日志服务

在大多数服务器应用程序中都会用到日志,例如在代码中插入 println 语句就是一种简单的日志。 像 PrintWriter 这样的字符流类是线程安全的,因此这种简单的方法不需要显示的同步。

但是如果需要在单条日志信息中写入多行,那么要通过**「客户端加锁」来避免多个不正确地交错输出。如果两个线程同时把多行栈追中信息(Stack Trace)** 添加到同一个流中,并且每行信息对应一个 println 调用,那么这些信息在输出中将交错在一起,看上去就是一些虽然庞大但毫无意义的栈追踪信息。

然而,在11.6节中将看到这种内联日志功能会给一些高容量的(Highvolume)应用程序带来一定的性能开销。 另一种替代方法是通过调用 log 方法将日志消息放入某个「队列」中,并由「其他线程」来处理。

程序清单 7-13LogWriter 中给出了一个简单的日志服务示例,其中日志操作是在单独的**「日志线程」** 中执行的。产生日志信息的线程并不会将消息直接写入输出流,而是由 LogWriter 通过 BlockingQueue 将信息提交给日志线程,并由日志线程写入。

这是一种 多生产者单消费者(Multiple-Producer,Single-Consumer) 的设计方式:每个调用 log 的操作都相当于一个生产者,而后台的日志线程则相当于消费者。 如果消费者的处理速度低于生产者的生成速度,那么 BlockingQueue 将阻塞生产者,直到日志线程有能力处理新的日志消息。

程序清单 7-13 不支持关闭的 生产者—消费者 日志服务:

// 不支持关闭的 生产者-消费者服务,没有终止线程的方法
public class LogWriter {
    private final BlockingQueue<String> queue;
    private final LoggerThread logger;
    private static final int CAPACITY = 1000;

    public LogWriter(Writer writer) {
        this.queue = new LinkedBlockingQueue<String>(CAPACITY);
        this.logger = new LoggerThread(writer);
    }

    // 开始产生日志
    public void start() {
        logger.start();
    }

    // 将日志放入阻塞队列中
    public void log(String msg) throws InterruptedException {
        queue.put(msg);
    }


    // 消费者,将队列中的日志写入文件
    private class LoggerThread extends Thread {
        private final PrintWriter writer;

        public LoggerThread(Writer writer) {
            this.writer = new PrintWriter(writer, true);
        }

        @Override
        public void run() {
            try {
                while (true) {
                    writer.println(queue.take());
                }
            } catch (InterruptedException e) {
                // 忽略
            } finally {
                writer.close();
            }
        }
    }
}

为了使像 LogWriter 这样的服务在软件产品中能发挥实际的作用,还需要实现一种 终止日志线程的方法,从而避免使 JVM 无法正常关闭。

停止日志线程是很容易的,因为它会反复调用 take,而 take方法可以响应中断。 如果将日志线程修改为捕获到 InterruptException 时退出,那么只需要中断日志线程就可以停止服务。 <--- 【使线程退出的一种解决方案,但并不是一个完善的关闭机制,会丢失队列中的信息】

然而**,如果只是日志线程退出,那么还不是一种完备的关闭机制**。 这种直接关闭的做法会丢失那些正在等待被写入到日志的信息,不仅如此,其他下次线程将在调用 log 时被阻塞,因为日志消息队列是满的,因此这些线程将无法解除阻塞状态

取消一个 生产者—消费者 操作时,需要同时取消生产者和消费者。 在中断日志线程时会处理消费者,但在这个示例中,由于生产者并不是专门的线程,因此要取消它们将非常困难

另一种关闭 LogWriter 的方法是:设置某个 「已请求关闭」的标志位,以避免进一步调教日志消息,如程序清单 7-14所示 在收到关闭请求后,消费者会把队列中的所有消息写入日志,并解除所有在调用 log 时阻塞的生产者。 然而这个方法中存在着 「竞态条件」 问题,使得该方法并不可靠。

log 的实现是一种**「先判断,再运行」的代码序列:生产者发现该服务还没有关闭,因此在关闭服务后仍然会将日志消息放入队列,这同样会使得生产者可能在调用 log 时阻塞并无法接触阻塞状态。 可以通过一些技巧来降低这种情况的发生概率(例如:在宣布队列在清空之前,让消费者等待数秒)但这些都没有解决问题的本质**:即使很小的概率也可能导致程序发生故障。

程序清单 7-14 : 通过一种不可靠的方式为日志服务增加关闭支持:

public void log(String msg) throws InterruptedException {
	if(!shutdownRequested) {
		queue.put(msg);
	} else {
		throw new IllegalStateException("logger is shut down")
	}
}

LogWriter 提供可靠的关闭操作的方法是解决竞态条件问题,因而要使日志消息的提交成为 「原子操作」。然而我们不希望在消息加入队列时持有一个锁,因为 put 方法本身就可以阻塞。

我们采用的方法是:通过原子方式检查关闭请求,并且有条件地递增一个计数器来"保持"提交消息的权利。

程序清单 7-15 向 LogWriter 添加可靠的取消操作:

// 存在可靠的取消服务的 LogWriter 类 通过原子方式检查关闭请求,有条件地递增一个计数器来保证提交消息的权利
public class LogService {
    private final BlockingQueue<String> queue;
    private final LoggerThread loggerThred;
    private final PrintWriter writer;

    @GuardedBy("this")
    private boolean isShutdown;
    @GuardedBy("this")
    private int reservations;

    public LogService(PrintWriter writer) {
        this.queue = new LinkedBlockingQueue<>();
        this.loggerThred = new LoggerThread();
        this.writer = new PrintWriter(writer);
    }

    public void start() {
        loggerThred.start();
    }

    /**
     * 关闭服务的方法,将中断标志设为true,并且申请中断该线程
     */
    public void stop() {
        synchronized (this) {
            isShutdown = true;
        }
        // 中断该线程
        loggerThred.interrupt();
    }

    /**
     * 产生 log 信息的方法,当服务被终止时 调用该方法抛出异常
     *
     * @param msg 日志信息
     * @throws InterruptedException
     */
    public void log(String msg) throws InterruptedException {
        synchronized (this) {
            if (isShutdown) {
                throw new IllegalStateException("线程已被终止");
            }
            // 工作队列中待处理信息的数量
            ++reservations;
        }
        queue.put(msg);
    }

    // 写入日志的工作线程
    private class LoggerThread extends Thread {
        @Override
        public void run() {
            try {
                while (true) {
                    try {
                        synchronized (LogService.this) {
                            if (isShutdown && reservations == 0) {
                                break;
                            }
                        }
                        final String msg = queue.take();
                        synchronized (LogService.this) {
                            --reservations;
                        }
                        // 写入数据
                        writer.println(msg);
                    } catch (InterruptedException e) {
                        // 重试
                    }
                }
            } finally {
                writer.close();
            }
        }
    }
}

【↑ 可以看到这里增加了可靠的对服务的取消的方法,使用内置锁保证了变量不会产生竞态条件问题,使日志消息的提交成为原子操作。】

7.2.2 关闭 ExecutorService

6.2.4 节中,我们看到 ExecutorService 提供了两种关闭方法:

  • 使用 shutdown 正常关闭。
  • 使用 shutdownNow 强行关闭。

在进行强行关闭时,shutdownNow 首先关闭正在执行的任务,然后返回所有未启动的任务清单。

这两种关闭的方式的差别在于各自的「安全性」 和 「响应性」: 强行关闭的关闭速度更快,但相应的,其风险也更大,因为任务很可能在执行到一半时被结束而正常关闭虽然速度慢,但却更安全,因为 ExecutorService 会一直等到队列中的所有任务都执行完才关闭。

在其他拥有线程的服务中,也应该考虑提供类似的关闭方式以供选择。

【↑ 意思是我们需要提供一个快速关闭的服务,和一个安全关闭的服务,这样可以适用于不同的场景。】

简单的程序可以直接在 main 函数中启动和关闭全局的 ExecutorService

而在复杂程序中,通常会将 ExecutorService 封装在某个更高级别的服务中,并且该服务能提供其自己的生命周期方法

例如程序清单 7-16LogService 的一种变化形式,它将管理线程的工作委托给一个 ExecutorService,而不是由其自行管理。

通过封装 ExecutorService 可以将「所有权链(Ownership Chain 」 从应用程序扩展到服务以及线程,所有权链上的各个成员都将管理它所拥有的服务或线程的生命周期。

【↑ 这句话很重要,点明了封装 ExecutorService 的意义所在。】

程序清单 7-16 使用 ExecutorService 的日志服务:

public class LogService {
	private final ExecutorService exec = newSingleThreadExecutor();
	...
	public void start() { }
  // 封装 ExecutorService 的关闭该方法
	public void stop() throws InterruptException {
		try {
			exec.shutdown();
			exec.awatiTermination(TIMEOUT,UNIT)
		} finally {
			writer.closer();
		}
	}
  
  public void log(String msg) {
    try {
      	exec.execute(new WriterTask(msg));
    } catch (RejectedExecutionException ignored) { // 忽略该异常}
  }
}

7.2.3 "毒丸" 对象

另一种关闭 「生产者—消费者」 服务的方式就是使用「毒丸」 Poison Pill 对象。

所谓 「毒丸」 是指: 一个放在队列上的对象,其含义是:当得到这个对象的时候,立即停止。 在FIFO 先进先出队列中,"毒丸" 对象将确保消费者在关闭之前首先完成队列中的所有工作,在提交 "毒丸" 对象之前提交的所有工作都会被处理,而生产者在提交了毒丸对象之后,将不会再提交任何工作。

【↑ 也就是代表了关闭意义的一个对象】

在程序清单 7-177-187-19 中给出一个单 生产者—消费者的 桌面搜索示例(来自程序清单 5-8) ,在这个示例中,使用了 "毒丸" 对象来关闭服务

程序清单 7-17 通过 毒丸 对象来关闭服务:(7-18,7-19 分别为具体的生产者和消费者代码片段,这里融合在一个完整的类中)

// 使用毒丸对象终止服务
public class IndexingService {
    private static final int CAPACITY = 1000;
    private static final File POISON = new File("");
    private final IndexerThread consumer = new IndexerThread();
    private final CrawlerThread producer = new CrawlerThread();
    private final BlockingQueue<File> queue;
    private final FileFilter fileFilter;
    private final File root;


    public IndexingService(BlockingQueue<File> queue, FileFilter fileFilter, File root) {
        this.queue = new LinkedBlockingQueue<>(CAPACITY);
        this.fileFilter = f -> f.isDirectory() || fileFilter.accept(f);
        this.root = root;
    }

    private boolean alreadyIndexed(File entry) {
        return false;
    }

    // 消费者的逻辑,判断从队列中取到是否为毒丸对象,如果不是就持续消费
    class IndexerThread extends Thread {
        @Override
        public void run() {
            try {
                while (true) {
                    final File file = queue.take();
                    if (file == POISON) {
                        break;
                    } else {
                        indexFile(file);
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void indexFile(File file) {
            // ... 具体业务逻辑 这里省略
        }
    }

    // 生产者的逻辑,当文件抓取完成后持续向队列中放入毒丸对象
    class CrawlerThread extends Thread {
        @Override
        public void run() {
            try {
                crawl(root);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 当抓取完成后开始持续放入毒丸对象来关闭这个任务
                while (true) {
                    try {
                        // 放入毒丸对象
                        queue.put(POISON);
                        break;
                    } catch (InterruptedException e) {
                        // 重试
                    }
                }
            }
        }
    }

    // 抓取文件的方法
    private void crawl(File root) throws InterruptedException {
        final File[] entries = root.listFiles(fileFilter);

        if (entries != null) {
            for (File entry : entries) {
                if (entry.isDirectory()) {
                    crawl(entry);
                } else if (!alreadyIndexed(entry)) {
                    queue.put(entry);
                }
            }
        }
    }
}

只有在生产者和消费者数量都已知的情况下,才可以使用"毒丸"对象。

IndexingService中采用的解决方案可以扩展到 「多个生产者」: 只需给每个生产者队列中都放入一个"毒丸"对象,并且消费者仅当在接收 生产者数量个毒丸对象时才会停止。

这种方法也可以扩展到多个消费者:只需生产者将 消费者数量的毒丸对象放入队列。

然而,当生产者和消费者的数量较大时,这种方法变得难以使用。 <---【瓶颈】

只有在无界队列中,"毒丸"对象才能可靠的工作。<---【数据结构要求】

7.2.4 示例:只执行一次的服务

如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一个私有的 Executor 来简化服务的生命周期管理,其中该 Executor 的生命周期是由这个方法来控制的。(在这种情况下 invokeAllinvokeAny 等方法通常会起较大作用)

【↑ 给了场景和解决方案,需要记录在册】

7.2.5 shutdownNow 的局限性

当通过 shutdonwNow 来强行关闭 ExecutorService 时,它会尝试取消正在执行的任务,并返回所有已提交但尚未开始的任务,从而将这些任务写入日志或者保存起来以便之后进行处理。

shutdownNow 返回的 Runnable 对象可能与提交给 ExecutorServiceRunnable 对象 「不相同」:它们可能是已经被封装过的已提交任务。】

然而我们无法通过「常规」 的方法来找出哪些任务已经开始但尚未结束。 这意味着我们无法在关闭过程中知道正在执行的任务的状态,除非任务本身会执行某种检查。

要知道哪些任务还没有完成,你不仅需要知道哪些任务还没有开始,还需要知道当 Executor 关闭时,哪些任务正在执行。【然而,在关闭过程中只会返回尚未开始的任务,而不会返回正在执行的任务。 如果能返回这两种类型的任务,那么就不需要这种不确定的中间状态】

程序清单 7-21ExecutorService跟踪在关闭之后被取消的任务

// 跟踪 ExecutorService 中被关闭的任务
public class TrackingExecutor extends AbstractExecutorService {
    private final ExecutorService exec;
    // 用来保存被关闭的任务
    private final Set<Runnable> taskCancelledAtShutdown = Collections.synchronizedSet(new HashSet<>());

    public TrackingExecutor(ExecutorService exec) {
        this.exec = exec;
    }

    @Override
    public void shutdown() {
        exec.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        return exec.shutdownNow();
    }

    @Override
    public boolean isShutdown() {
        return exec.isShutdown();
    }

    @Override
    public boolean isTerminated() {
        return exec.isTerminated();
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return exec.awaitTermination(timeout,unit);
    }

    public List<Runnable> getCancelledTasks() {
        if (!exec.isTerminated()) {
            throw new IllegalStateException("当前服务尚未关闭");
        }
        return new ArrayList<>(taskCancelledAtShutdown);
    }

    @Override
    public void execute(Runnable runnable) {
        exec.execute(() -> {
            try {
                runnable.run();
            }finally {
                if (isShutdown() && Thread.currentThread().isInterrupted()) {
                    taskCancelledAtShutdown.add(runnable);
                }
            }
        });
    }
}

程序清单 7-22WebCrawler 中给出了 TrackingExecutor 的用法。 网页爬虫程序的工作通常是无穷尽一直进行的,因此当爬虫程序必须关闭时,我们通常希望保存它的状态,以便稍后重新启动。

CrawlTask 提供了一个 getPage 方法,该方法能找出正在处理的页面。

当爬虫程序关闭时,无论是**「尚未开始新任务」** 还是 「那些被取消的任务」,都会记录它们的 URL,当爬虫程序重新启动,就可以将这些 URL 的页面抓取任务加入到任务队列中重新执行。

↑【断点续传】

程序清单 7-22 使用 TrackingExecutorService 来保存未完成的任务以备后续执行:

// 使用 TrackingExecutorService 来保存未完成的抓取任务,以便下次启动时重新抓取
public abstract class WebCrawler {
    private volatile TrackingExecutor exec;
    @GuardedBy("this")
    private final Set<URL> urlsToCrawl = new HashSet<>();
    private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<>();
    private static final long TIMEOUT = 500;
    private static final TimeUnit UNIT = TimeUnit.MILLISECONDS;

    public WebCrawler(URL startUrl) {
        urlsToCrawl.add(startUrl);
    }

    public synchronized void start() {
        exec = new TrackingExecutor(Executors.newCachedThreadPool());
        urlsToCrawl.forEach(this::submitCrawlTask);
        urlsToCrawl.clear();
    }

    public synchronized void stop() throws InterruptedException {
        try {
            saveUncrawled(exec.shutdownNow());
            // 作用?
            if (exec.awaitTermination(TIMEOUT, UNIT)) {
                saveUncrawled(exec.getCancelledTasks());
            }
        } finally {
            exec = null;
        }
    }

    protected abstract List<URL> processPage(URL url);

    // 保存尚未执行的抓取任务
    private void saveUncrawled(List<Runnable> unCrawledTask) {
        for (Runnable task : unCrawledTask) {
            urlsToCrawl.add(((CrawlTask) task).getPage());
        }
    }

    private void submitCrawlTask(URL url) {
        exec.execute(new CrawlTask(url));
    }

    private class CrawlTask implements Runnable {
        private final URL url;


        public CrawlTask(URL url) {
            this.url = url;
        }

        // 这个计数器没有使用上
        private int count = 1;

        // 判断该任务是否已经被抓取过
        boolean alreadyCrawled() {
            return seen.putIfAbsent(url, true) != null;
        }

        // 将已经抓取的任务移除队列
        void makeUncrawled() {
            seen.remove(url);
            System.out.printf("marking %s uncrawled %n", url);
        }

        @Override
        public void run() {
            for (URL link : processPage(url)) {
                if (Thread.currentThread().isInterrupted()) {
                    return;
                }
                submitCrawlTask(link);
            }
        }

        public URL getPage() {
            return url;
        }
    }
}

TrackingExecutor 中存在一个**「不可避免的竞态条件」**,从而产生误报问题:一些被认为已经取消的任务实际上已经执行完成。

产生这个问题的原因是:在任务执行最后一条指令以及线程池将任务记录为"结束"的两个时刻之间,线程池可能关闭。

如果任务是**「幂等」**的,(任务多次执行与执行一次的结果相同),那么不会存在问题,在网页爬虫中就是这种情况。

否则,在应用程序中必须考虑这种风险,并对误报问题做好准备。

↑【给出了问题,给出了原因,给出了场景,包括这个代码,都很值得好好消化,反复咀嚼。】

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions