通常,Java I/O 框架用途極其廣泛。同一個(gè)框架支持文件存取、網(wǎng)絡(luò)訪問(wèn)、字符轉(zhuǎn)換、壓縮和加密等等。不過(guò),有時(shí)它不是十分靈活。例如,壓縮流允許您將數(shù)據(jù)寫(xiě)成壓縮格式,但它們不能讓您讀取壓縮格式的數(shù)據(jù)。同樣地,某些第三方模塊被構(gòu)建成寫(xiě)出數(shù)據(jù),而沒(méi)有考慮應(yīng)用程序需要讀取數(shù)據(jù)的情形。本文是兩部分系列文章的第一部分,Java 密碼專家和作家 Merlin Hughes 介紹了使應(yīng)用程序從僅支持將數(shù)據(jù)寫(xiě)至輸出流的源中有效讀取數(shù)據(jù)的框架。
自早期基于瀏覽器的 applet 和簡(jiǎn)單應(yīng)用程序以來(lái),Java 平臺(tái)已有了巨大的發(fā)展?,F(xiàn)在,我們有多個(gè)平臺(tái)和概要及許多新的 API,并且還在制作的差不多有數(shù)百種之多。盡管 Java 語(yǔ)言的復(fù)雜程度在不斷增加,但它對(duì)于日常的編程任務(wù)而言仍是一個(gè)出色的工具。雖然有時(shí)您會(huì)陷入那些日復(fù)一日的編程問(wèn)題中,但偶爾您也能夠回過(guò)頭去,發(fā)現(xiàn)一個(gè)很棒的解決方案來(lái)處理您以前曾多次遇到過(guò)的問(wèn)題。
就在前幾天,我想要壓縮一些通過(guò)網(wǎng)絡(luò)連接讀取的數(shù)據(jù)(我以壓縮格式將 TCP 數(shù)據(jù)中繼到一個(gè) UDP 套接字)。記得 Java 平臺(tái)自版本 1.1 開(kāi)始就支持壓縮,所以我直接求助于 java.util.zip
包,希望能找到一個(gè)適合于我的解決方案。然而,我發(fā)現(xiàn)一個(gè)問(wèn)題:構(gòu)造的類都適用于常規(guī)情況,即在讀取時(shí)對(duì)數(shù)據(jù)解壓縮而在寫(xiě)入時(shí)壓縮它們,沒(méi)有其它變通方法。雖然繞過(guò) I/O 類是可能的,但我希望構(gòu)建一個(gè)基于流的解決方案,而不想偷懶直接使用壓縮程序。
不久以前,我在另一種情況下也遇到過(guò)完全相同的問(wèn)題。我有一個(gè) base-64 轉(zhuǎn)碼庫(kù),與使用壓縮包一樣,它支持對(duì)從流中讀取的數(shù)據(jù)進(jìn)行譯碼,并對(duì)寫(xiě)入流中的數(shù)據(jù)進(jìn)行編碼。然而,我需要的是一個(gè)在我從流中讀取數(shù)據(jù)的同時(shí)可以進(jìn)行編碼的庫(kù)。
在我著手解決該問(wèn)題時(shí),我認(rèn)識(shí)到我在另一種情況下也遇到過(guò)該問(wèn)題:當(dāng)序列化 XML 文檔時(shí),通常會(huì)循環(huán)遍歷整個(gè)文檔,將節(jié)點(diǎn)寫(xiě)入流中。然而,我遇到的情況是需要讀取序列化格式的文檔,以便將子集重新解析成一個(gè)新文檔。
回過(guò)頭想一下,我意識(shí)到這些孤立事件表示了一個(gè)共性的問(wèn)題:如果有一個(gè)遞增地將數(shù)據(jù)寫(xiě)入輸出流的數(shù)據(jù)源,那么我需要一個(gè)輸入流使我能夠讀取這些數(shù)據(jù),每當(dāng)需要更多數(shù)據(jù)時(shí),都能透明地訪問(wèn)數(shù)據(jù)源。
在本文中,我們將研究對(duì)這一問(wèn)題的三種可能的解決方案,同時(shí)決定一個(gè)實(shí)現(xiàn)最佳解決方案的新框架。然后,我們將針對(duì)上面列出的每個(gè)問(wèn)題,檢驗(yàn)該框架。我們將扼要地談及性能方面的問(wèn)題,而把對(duì)此的大量討論留到下一篇文章中。
I/O 流基礎(chǔ)知識(shí)
首先,讓我們簡(jiǎn)單回顧一下 Java 平臺(tái)的基本流類,如圖 1 所示。 OutputStream
表示對(duì)其寫(xiě)入數(shù)據(jù)的流。通常,該流將直接連接至諸如文件或網(wǎng)絡(luò)連接之類的設(shè)備,或連接至另一個(gè)輸出流(在這種情況下,它稱為 過(guò)濾器(filter))。通常,輸出流過(guò)濾器在轉(zhuǎn)換了寫(xiě)入其中的數(shù)據(jù)之后,才將轉(zhuǎn)換后產(chǎn)生的數(shù)據(jù)寫(xiě)入相連的流中。 InputStream
表示可以從中讀取數(shù)據(jù)的流。同樣,該流也直接連接至設(shè)備或其它流。輸入流過(guò)濾器從相連的流中讀取數(shù)據(jù),轉(zhuǎn)換該數(shù)據(jù),然后允許從中讀取轉(zhuǎn)換后的數(shù)據(jù)。
圖 1. I/O 流基礎(chǔ)知識(shí)
就我最初的問(wèn)題看, GZIPOutputStream
類是一個(gè)輸出流過(guò)濾器,它壓縮寫(xiě)入其中的數(shù)據(jù),然后將該壓縮數(shù)據(jù)寫(xiě)入相連的流。我需要的輸入流過(guò)濾器應(yīng)該能從流中讀取數(shù)據(jù),壓縮數(shù)據(jù),然后讓我讀取結(jié)果。
Java 平臺(tái),版本 1.4 已引入了一個(gè)新的 I/O 框架 java.nio
。不過(guò),該框架在很大程度上與提供對(duì)操作系統(tǒng) I/O 資源的有效訪問(wèn)有關(guān);而且,雖然它確實(shí)為一些傳統(tǒng)的 java.io
類提供了類似功能,并可以表示同時(shí)支持輸入和輸出的雙重用途的資源,但它并不能完全替代標(biāo)準(zhǔn)流類,并且不能直接處理我需要解決的問(wèn)題。
蠻力解決方案
在著手尋找解決我問(wèn)題的工程方案前,我根據(jù)標(biāo)準(zhǔn) Java API 類的精致和有效性,研究了基于這些類的解決方案。
該問(wèn)題的蠻力解決方案就是簡(jiǎn)單地從輸入源中讀取所有數(shù)據(jù),然后通過(guò)轉(zhuǎn)換程序(即,壓縮流、編碼流或 XML 序列化器)將它們推進(jìn)內(nèi)存緩沖區(qū)中。然后,我可以從該內(nèi)存緩沖區(qū)中打開(kāi)要讀取的流,這樣我就解決了問(wèn)題。
首先,我需要一個(gè)通用的 I/O 方法。清單 1 中的方法利用一個(gè)小緩沖區(qū)將 InputStream
中的所有數(shù)據(jù)復(fù)制到 OutputStream
。當(dāng)?shù)竭_(dá)輸入的結(jié)尾( read()
函數(shù)的返回值小于零)時(shí),該方法就返回,但不關(guān)閉這兩個(gè)流。
清單 1. 通用的 I/O 方法
public static void io (InputStream in, OutputStream out)
throws IOException {
byte[] buffer = new byte[8192];
int amount;
while ((amount = in.read (buffer)) >= 0)
out.write (buffer, 0, amount);
}
|
清單 2 顯示蠻力解決方案如何使我讀取壓縮格式的輸入流。我打開(kāi)寫(xiě)入內(nèi)存緩沖區(qū)的 GZIPOutputStream
(使用 ByteArrayOutputStream
)。接著,將輸入流復(fù)制到壓縮流中,這樣將壓縮數(shù)據(jù)填入內(nèi)存緩沖區(qū)中。然后,我返回 ByteArrayInputStream
,它讓我從輸入流中讀取,如圖 2 所示。
圖 2. 蠻力解決方案
清單 2. 蠻力解決方案
public static InputStream bruteForceCompress (InputStream in)
throws IOException {
ByteArrayOutputStream sink = new ByteArrayOutputStream ():
OutputStream out = new GZIPOutputStream (sink);
io (in, out);
out.close ();
byte[] buffer = sink.toByteArray ();
return new ByteArrayInputStream (buffer);
}
|
這個(gè)解決方案有一個(gè)明顯的缺點(diǎn),它將整個(gè)壓縮文檔都存儲(chǔ)在內(nèi)存中。如果文檔很大,那么這種方法將不必要地浪費(fèi)系統(tǒng)資源。使用流的主要特性之一是它們?cè)试S您操作比所用系統(tǒng)內(nèi)存要大的數(shù)據(jù):您可以在讀取數(shù)據(jù)時(shí)處理它們,或在寫(xiě)入數(shù)據(jù)時(shí)生成數(shù)據(jù),而無(wú)需始終將所有數(shù)據(jù)保存在內(nèi)存中。
從效率上,讓我們對(duì)在緩沖區(qū)之間復(fù)制數(shù)據(jù)進(jìn)行更深入研究。
通過(guò) io()
方法,將數(shù)據(jù)從輸入源讀入至一個(gè)緩沖區(qū)中。然后,將數(shù)據(jù)從緩沖區(qū)寫(xiě)入 ByteArrayOutputStream
中的緩沖區(qū)(通過(guò)我忽略的壓縮過(guò)程)。然而, ByteArrayOutputStream
類對(duì)擴(kuò)展的內(nèi)部緩沖區(qū)進(jìn)行操作;每當(dāng)緩沖區(qū)變滿時(shí),就會(huì)分配一個(gè)大小是原來(lái)兩倍的新緩沖區(qū),接著將現(xiàn)有的數(shù)據(jù)復(fù)制到該緩沖區(qū)中。平均下來(lái),這一過(guò)程每個(gè)字節(jié)復(fù)制兩次。(算術(shù)計(jì)算很簡(jiǎn)單:當(dāng)進(jìn)入 ByteArrayOutputStream
時(shí),對(duì)數(shù)據(jù)平均復(fù)制兩次;所有數(shù)據(jù)至少?gòu)?fù)制一次;有一半數(shù)據(jù)至少?gòu)?fù)制兩次;四分之一的數(shù)據(jù)至少?gòu)?fù)制三次,依次類推。)然后,將數(shù)據(jù)從該緩沖區(qū)復(fù)制到 ByteArrayInputStream
的一個(gè)新緩沖區(qū)中?,F(xiàn)在,應(yīng)用程序可以讀取數(shù)據(jù)了??傊?,這個(gè)解決方案將通過(guò)四個(gè)緩沖區(qū)寫(xiě)數(shù)據(jù)。這對(duì)于估計(jì)其它技術(shù)的效率是一個(gè)有用的基準(zhǔn)。
管道式流解決方案
管道式流 PipedOutputStream
和 PipedInputStream
在 Java 虛擬機(jī)的線程之間提供了基于流的連接。一個(gè)線程將數(shù)據(jù)寫(xiě)入 PipedOutputStream
中的同時(shí),另一個(gè)線程可以從相關(guān)聯(lián)的 PipedInputStream
中讀取該數(shù)據(jù)。
就這樣,這些類提供了一個(gè)針對(duì)我問(wèn)題的解決方案。清單 3 顯示了使用一個(gè)線程通過(guò) GZIPOutputStream
將數(shù)據(jù)從輸入流復(fù)制到 PipedOutputStream
的代碼。然后,相關(guān)聯(lián)的 PipedInputStream
將提供對(duì)來(lái)自另一個(gè)線程的壓縮數(shù)據(jù)的讀取權(quán),如圖 3 所示:
圖 3. 管道式流解決方案
清單 3. 管道式流解決方案
private static InputStream pipedCompress (final InputStream in)
throws IOException {
PipedInputStream source = new PipedInputStream ();
final OutputStream out =
new GZIPOutputStream (new PipedOutputStream (source));
new Thread () {
public void run () {
try {
Streams.io (in, out);
out.close ();
} catch (IOException ex) {
ex.printStackTrace ();
}
}
}.start ();
return source;
}
|
理論上,這可能是個(gè)好技術(shù):通過(guò)使用線程(一個(gè)執(zhí)行壓縮,另一個(gè)處理產(chǎn)生的數(shù)據(jù)),應(yīng)用程序可以從硬件 SMP(對(duì)稱多處理)或 SMT(對(duì)稱多線程)中受益。另外,這一解決方案僅涉及兩個(gè)緩沖區(qū)寫(xiě)操作:I/O 循環(huán)將數(shù)據(jù)從輸入流讀入緩沖區(qū),然后通過(guò)壓縮流寫(xiě)入 PipedOutputStream
。接著,輸出流將數(shù)據(jù)存儲(chǔ)在內(nèi)部緩沖區(qū)中,與 PipedInputStream
共享緩沖區(qū)以供應(yīng)用程序讀取。而且,因?yàn)閿?shù)據(jù)通過(guò)固定緩沖區(qū)流動(dòng),所以從不需要將它們完全讀入內(nèi)存中。事實(shí)上,在任何給定時(shí)刻,緩沖區(qū)都只存儲(chǔ)小部分的工作集。
不過(guò),實(shí)際上,它的性能很糟糕。管道式流需要利用同步,從而引起兩個(gè)線程之間激烈爭(zhēng)奪同步。它們的內(nèi)部緩沖區(qū)太小,無(wú)法有效地處理大量數(shù)據(jù)或隱藏鎖爭(zhēng)用。其次,持久共享緩沖區(qū)會(huì)阻礙許多簡(jiǎn)單的高速緩存策略共享 SMP 機(jī)器上的工作負(fù)載。最后,線程的使用使得異常處理極其困難:沒(méi)有辦法將可能出現(xiàn)的任何 IOException
下推到管道中以便閱讀器處理。總之,這一解決方案太難處理,根本不實(shí)際。
 |
同步問(wèn)題
本文中提供的代碼都不能同步;也就是說(shuō),兩個(gè)線程并發(fā)地訪問(wèn)其中一個(gè)類的共享實(shí)例是不安全的。 因?yàn)槿?NIO 框架和 Collections API 之類的庫(kù)已經(jīng)公認(rèn)是實(shí)用的,所以使得同步成為應(yīng)用程序中的一種負(fù)擔(dān)。如果應(yīng)用程序希望對(duì)一個(gè)對(duì)象進(jìn)行并發(fā)訪問(wèn),應(yīng)用程序必須采取必要的步驟來(lái)同步訪問(wèn)。 雖然最近的 JVM 已在其線程安全性機(jī)制的性能上有了很大的改進(jìn),但同步仍是一個(gè)開(kāi)銷很大的操作。在 I/O 的情況下,對(duì)單個(gè)流的并發(fā)訪問(wèn)幾乎必定是一個(gè)錯(cuò)誤;結(jié)果數(shù)據(jù)流的次序是不確定的,這不是理想的情形。正因?yàn)檫@樣,要同步這些類會(huì)強(qiáng)加一些不必要的費(fèi)用,又沒(méi)有確實(shí)的收益。 我們將在這一系列文章的第 2 部分更詳細(xì)討論多線程的考慮事項(xiàng);目前,只要注意:對(duì)我所提供的對(duì)流的并發(fā)訪問(wèn)將導(dǎo)致不確定錯(cuò)誤。
|
|
工程解決方案
現(xiàn)在,我們將研究另一種解決該問(wèn)題的工程方案。這種解決方案提供了一個(gè)特地為解決這類問(wèn)題而設(shè)計(jì)的框架,該框架提供了對(duì)數(shù)據(jù)的 InputStream
訪問(wèn),這些數(shù)據(jù)是從遞增地向 OutputStream
寫(xiě)入數(shù)據(jù)的源中產(chǎn)生的。遞增地寫(xiě)入數(shù)據(jù)這一事實(shí)很重要。如果源在單個(gè)原子操作中將所有數(shù)據(jù)都寫(xiě)入 OutputStream
,而且如果不使用線程,則我們基本上又回到了蠻力技術(shù)的老路上。不過(guò),如果可以訪問(wèn)源以遞增地寫(xiě)入其數(shù)據(jù),則我們就實(shí)現(xiàn)了在蠻力和管道式流解決方案之間的良好平衡。該解決方案不僅提供了在任何時(shí)候只在內(nèi)存中保存少量數(shù)據(jù)的管道式優(yōu)點(diǎn),同時(shí)也提供了避免線程的蠻力技術(shù)的優(yōu)點(diǎn)。
圖 4 演示了完整的解決方案。我們將在本文的剩余部分研究 該解決方案的源代碼。
圖 4. 工程解決方案
輸出引擎
清單 4 提供了一個(gè)描述數(shù)據(jù)源的接口 OutputEngine
。正如我所說(shuō)的,這些源遞增地將數(shù)據(jù)寫(xiě)入輸出流:
清單 4. 輸出引擎
package org.merlin.io;
import java.io.*;
/**
* An incremental data source that writes data to an OutputStream.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*
* This program is free software; you can redistribute
* it and/or modify it under the terms of the GNU
* General Public License as published by the Free
* Software Foundation; either version 2
* of the License, or (at your option) any later version.
*/
public interface OutputEngine {
public void initialize (OutputStream out) throws IOException;
public void execute () throws IOException;
public void finish () throws IOException;
}
|
initialize()
方法向該引擎提供一個(gè)流,應(yīng)該向這個(gè)流寫(xiě)入數(shù)據(jù)。然后,重復(fù)調(diào)用 execute()
方法將數(shù)據(jù)寫(xiě)入該流中。當(dāng)數(shù)據(jù)寫(xiě)完時(shí),引擎會(huì)關(guān)閉該流。最后,當(dāng)引擎應(yīng)該關(guān)閉時(shí),將調(diào)用 finish()
。這會(huì)發(fā)生在引擎關(guān)閉其輸出流的前后。
I/O 流引擎
輸出引擎解決了讓我費(fèi)力處理的問(wèn)題,它是一個(gè)通過(guò)輸出流過(guò)濾器將數(shù)據(jù)從輸入流復(fù)制到目標(biāo)輸出流的引擎。這滿足了遞增性的特性,因?yàn)樗梢砸淮巫x寫(xiě)單個(gè)緩沖區(qū)。
清單 5 到 10 中的代碼實(shí)現(xiàn)了這樣的一個(gè)引擎。通過(guò)輸入流和輸入流工廠來(lái)構(gòu)造它。清單 11 是一個(gè)生成過(guò)濾后的輸出流的工廠;例如,它會(huì)返回包裝了目標(biāo)輸出流的 GZIPOutputStream
。
清單 5. I/O 流引擎
package org.merlin.io;
import java.io.*;
/**
* An output engine that copies data from an InputStream through
* a FilterOutputStream to the target OutputStream.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*/
public class IOStreamEngine implements OutputEngine {
private static final int DEFAULT_BUFFER_SIZE = 8192;
private InputStream in;
private OutputStreamFactory factory;
private byte[] buffer;
private OutputStream out;
|
該類的構(gòu)造器只初始化各種變量和將用于傳輸數(shù)據(jù)的緩沖區(qū)。
清單 6. 構(gòu)造器
public IOStreamEngine (InputStream in, OutputStreamFactory factory) {
this (in, factory, DEFAULT_BUFFER_SIZE);
}
public IOStreamEngine
(InputStream in, OutputStreamFactory factory, int bufferSize) {
this.in = in;
this.factory = factory;
buffer = new byte[bufferSize];
}
|
在 initialize()
方法中,該引擎調(diào)用其工廠來(lái)封裝與其一起提供的 OutputStream
。該工廠通常將一個(gè)過(guò)濾器連接至 OutputStream
。
清單 7. initialize() 方法
public void initialize (OutputStream out) throws IOException {
if (this.out != null) {
throw new IOException ("Already initialised");
} else {
this.out = factory.getOutputStream (out);
}
}
|
在 execute()
方法中,引擎從 InputStream
中讀取一個(gè)緩沖區(qū)的數(shù)據(jù),然后將它們寫(xiě)入已封裝的 OutputStream
;或者,如果輸入結(jié)束,它會(huì)關(guān)閉 OutputStream
。
清單 8. execute() 方法
public void execute () throws IOException {
if (out == null) {
throw new IOException ("Not yet initialised");
} else {
int amount = in.read (buffer);
if (amount < 0) {
out.close ();
} else {
out.write (buffer, 0, amount);
}
}
}
|
最后,當(dāng)關(guān)閉引擎時(shí),它就關(guān)閉其 InputStream
。
清單 9. 關(guān)閉 InputStream
public void finish () throws IOException {
in.close ();
}
|
內(nèi)部 OutputStreamFactory
接口(下面清單 10 中所示)描述可以返回過(guò)濾后的 OutputStream
的類。
清單 10. 內(nèi)部輸出流工廠接口
public static interface OutputStreamFactory {
public OutputStream getOutputStream (OutputStream out)
throws IOException;
}
}
|
清單 11 顯示將提供的流封裝到 GZIPOutputStream
中的一個(gè)示例工廠:
清單 11. GZIP 輸出流工廠
public class GZIPOutputStreamFactory
implements IOStreamEngine.OutputStreamFactory {
public OutputStream getOutputStream (OutputStream out)
throws IOException {
return new GZIPOutputStream (out);
}
}
|
該 I/O 流引擎及其輸出流工廠框架通常足以支持大多數(shù)的輸出流過(guò)濾需要。
輸出引擎輸入流
最后,我們還需要一小段代碼來(lái)完成這個(gè)解決方案。清單 12 到 16 中的代碼提供了讀取由輸出引擎所寫(xiě)數(shù)據(jù)的輸入流。事實(shí)上,這段代碼有兩個(gè)部分:主類是一個(gè)從內(nèi)部緩沖區(qū)讀取數(shù)據(jù)的輸入流。與此緊密耦合的是一個(gè)輸出流(如清單 17 所示),它把輸出引擎所寫(xiě)的數(shù)據(jù)填充到內(nèi)部讀緩沖區(qū)。
主輸入流類將用其內(nèi)部輸出流來(lái)初始化輸出引擎。然后,每當(dāng)它的緩沖區(qū)為空時(shí),它會(huì)自動(dòng)執(zhí)行該引擎來(lái)接收更多數(shù)據(jù)。輸出引擎將數(shù)據(jù)寫(xiě)入其輸出流中,這將重新填充輸入流的內(nèi)部緩沖區(qū),以允許需要內(nèi)部緩沖區(qū)數(shù)據(jù)的應(yīng)用程序高效地讀取數(shù)據(jù)。
清單 12. 輸出引擎輸入流
package org.merlin.io;
import java.io.*;
/**
* An input stream that reads data from an OutputEngine.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*/
public class OutputEngineInputStream extends InputStream {
private static final int DEFAULT_INITIAL_BUFFER_SIZE = 8192;
private OutputEngine engine;
private byte[] buffer;
private int index, limit, capacity;
private boolean closed, eof;
|
該輸入流的構(gòu)造器獲取一個(gè)輸出引擎以從中讀取數(shù)據(jù)和一個(gè)可選的緩沖區(qū)大小。該流首先初始化其本身,然后初始化輸出引擎。
清單 13. 構(gòu)造器
public OutputEngineInputStream (OutputEngine engine) throws IOException {
this (engine, DEFAULT_INITIAL_BUFFER_SIZE);
}
public OutputEngineInputStream (OutputEngine engine, int initialBufferSize)
throws IOException {
this.engine = engine;
capacity = initialBufferSize;
buffer = new byte[capacity];
engine.initialize (new OutputStreamImpl ());
}
|
代碼的主要讀部分是一個(gè)相對(duì)簡(jiǎn)單的基于字節(jié)數(shù)組的輸入流,與 ByteArrayInputStream
類非常相似。然而,每當(dāng)需要數(shù)據(jù)而該流為空時(shí),它都會(huì)調(diào)用輸出引擎的 execute()
方法來(lái)重新填寫(xiě)讀緩沖區(qū)。然后,將這些新數(shù)據(jù)返回給調(diào)用程序。因而,這個(gè)類將對(duì)輸出引擎所寫(xiě)的數(shù)據(jù)反復(fù)讀取,直到它讀完為止,此時(shí)將設(shè)置 eof
標(biāo)志并且該流將返回已到達(dá)文件末尾的信息。
清單 14. 讀取數(shù)據(jù)
private byte[] one = new byte[1];
public int read () throws IOException {
int amount = read (one, 0, 1);
return (amount < 0) ? -1 : one[0] & 0xff;
}
public int read (byte data[], int offset, int length)
throws IOException {
if (data == null) {
throw new NullPointerException ();
} else if
((offset < 0) || (length < 0) || (offset + length > data.length)) {
throw new IndexOutOfBoundsException ();
} else if (closed) {
throw new IOException ("Stream closed");
} else {
while (index >= limit) {
if (eof)
return -1;
engine.execute ();
}
if (limit - index < length)
length = limit - index;
System.arraycopy (buffer, index, data, offset, length);
index += length;
return length;
}
}
public long skip (long amount) throws IOException {
if (closed) {
throw new IOException ("Stream closed");
} else if (amount <= 0) {
return 0;
} else {
while (index >= limit) {
if (eof)
return 0;
engine.execute ();
}
if (limit - index < amount)
amount = limit - index;
index += (int) amount;
return amount;
}
}
public int available () throws IOException {
if (closed) {
throw new IOException ("Stream closed");
} else {
return limit - index;
}
}
|
當(dāng)操作數(shù)據(jù)的應(yīng)用程序關(guān)閉該流時(shí),它調(diào)用輸出引擎的 finish()
方法,以便可以釋放其正在使用的任何資源。
清單 15. 釋放資源
public void close () throws IOException {
if (!closed) {
closed = true;
engine.finish ();
}
}
|
當(dāng)輸出引擎將數(shù)據(jù)寫(xiě)入其輸出流時(shí),調(diào)用 writeImpl()
方法。它將這些數(shù)據(jù)復(fù)制到讀緩沖區(qū),并更新讀限制索引;這將使新數(shù)據(jù)可自動(dòng)地用于讀方法。
在單次循環(huán)中,如果輸出引擎寫(xiě)入的數(shù)據(jù)比緩沖區(qū)中可以保存的數(shù)據(jù)多,則緩沖區(qū)的容量會(huì)翻倍。然而,這不能頻繁發(fā)生;緩沖區(qū)應(yīng)該快速擴(kuò)展到足夠的大小,以便進(jìn)行狀態(tài)穩(wěn)定的操作。
清單 16. writeImpl() 方法
private void writeImpl (byte[] data, int offset, int length) {
if (index >= limit)
index = limit = 0;
if (limit + length > capacity) {
capacity = capacity * 2 + length;
byte[] tmp = new byte[capacity];
System.arraycopy (buffer, index, tmp, 0, limit - index);
buffer = tmp;
limit -= index;
index = 0;
}
System.arraycopy (data, offset, buffer, limit, length);
limit += length;
}
|
下面清單 17 中顯示的內(nèi)部輸出流實(shí)現(xiàn)表示了一個(gè)流將數(shù)據(jù)寫(xiě)入內(nèi)部輸出流緩沖區(qū)。該代碼驗(yàn)證參數(shù)都是可接受的,并且如果是這樣的話,它調(diào)用 writeImpl()
方法。
清單 17. 內(nèi)部輸出流實(shí)現(xiàn)
private class OutputStreamImpl extends OutputStream {
public void write (int datum) throws IOException {
one[0] = (byte) datum;
write (one, 0, 1);
}
public void write (byte[] data, int offset, int length)
throws IOException {
if (data == null) {
throw new NullPointerException ();
} else if
((offset < 0) || (length < 0) || (offset + length > data.length)) {
throw new IndexOutOfBoundsException ();
} else if (eof) {
throw new IOException ("Stream closed");
} else {
writeImpl (data, offset, length);
}
}
|
最后,當(dāng)輸出引擎關(guān)閉其輸出流,表明它已寫(xiě)入了所有的數(shù)據(jù)時(shí),該輸出流設(shè)置輸入流的 eof
標(biāo)志,表明已經(jīng)讀取了所有的數(shù)據(jù)。
清單 18. 設(shè)置輸入流的 eof 標(biāo)志
public void close () {
eof = true;
}
}
}
|
敏感的讀者可能注意到我應(yīng)該將 writeImpl()
方法的主體直接放在輸出流實(shí)現(xiàn)中:內(nèi)部類有權(quán)訪問(wèn)所有包含類的私有成員。然而,對(duì)這些字段的內(nèi)部類訪問(wèn)比由包含類的直接方法的訪問(wèn)在效率方面稍許差一些。所以,考慮到效率以及為了使類之間的相關(guān)性最小化,我使用額外的助手方法。
應(yīng)用工程解決方案:在讀取期間壓縮數(shù)據(jù)
清單 19 演示了這個(gè)類框架的使用來(lái)解決我最初的問(wèn)題:在我讀取數(shù)據(jù)時(shí)壓縮它們。該解決方案歸結(jié)為創(chuàng)建一個(gè)與輸入流相關(guān)聯(lián)的 IOStreamEngine
和一個(gè) GZIPOutputStreamFactory
,然后將 OutputEngineInputStream
與這個(gè) GZIPOutputStreamFactory
相連。自動(dòng)執(zhí)行流的初始化和連接,然后可以直接從結(jié)果流中讀取壓縮數(shù)據(jù)。當(dāng)處理完成且關(guān)閉流時(shí),輸出引擎自動(dòng)關(guān)閉,并且它關(guān)閉初始輸入流。
清單 19. 應(yīng)用工程解決方案
private static InputStream engineCompress (InputStream in)
throws IOException {
return new OutputEngineInputStream
(new IOStreamEngine (in, new GZIPOutputStreamFactory ()));
}
|
雖然為解決這類問(wèn)題而設(shè)計(jì)的解決方案應(yīng)該產(chǎn)生十分清晰的代碼,這一點(diǎn)沒(méi)有什么可驚奇的,但是通常要充分留意以下教訓(xùn):無(wú)論問(wèn)題大小,應(yīng)用良好的設(shè)計(jì)技術(shù)都幾乎肯定會(huì)產(chǎn)生更為清晰、更便于維護(hù)的代碼。
測(cè)試性能
從效率看, IOStreamEngine
將數(shù)據(jù)讀入其內(nèi)部緩沖區(qū),然后通過(guò)壓縮過(guò)濾器將它們寫(xiě)入 OutputStreamImpl
。這將數(shù)據(jù)直接寫(xiě)入 OutputEngineInputStream
,以便它們可供讀取。總共只執(zhí)行兩次緩沖區(qū)復(fù)制,這意味著我應(yīng)該從管道式流解決方案的緩沖區(qū)復(fù)制效率和蠻力解決方案的無(wú)線程效率的結(jié)合中獲益。
要測(cè)試實(shí)際的性能,我編寫(xiě)了一個(gè)簡(jiǎn)單的測(cè)試工具(請(qǐng)參閱所附 資源中的 test.PerformanceTest
),它使用這三個(gè)推薦的解決方案,通過(guò)使用一個(gè)空過(guò)濾器來(lái)讀取一塊啞元數(shù)據(jù)。在運(yùn)行 Java 2 SDK,版本 1.4.0 的 800 MHz Linux 機(jī)器上,達(dá)到了下列性能:
管道式流解決方案
15KB:23ms;15MB:22100ms
蠻力解決方案
15KB:0.35ms;15MB:745ms
工程解決方案
15KB:0.16ms;15MB:73ms
該問(wèn)題的工程解決方案很明顯比基于標(biāo)準(zhǔn) Java API 的另兩個(gè)方法都更有效。
順便提一下,考慮到如果輸出引擎能夠遵守這樣的約定:在將數(shù)據(jù)寫(xiě)入其輸出流后,它不修改從中寫(xiě)入數(shù)據(jù)的數(shù)組而返回,那么我能提供一個(gè)只使用一次緩沖區(qū)復(fù)制操作的解決方案。可是,輸出引擎很少會(huì)遵守這種約定。如果需要,輸出引擎只要通過(guò)實(shí)現(xiàn)適當(dāng)?shù)臉?biāo)記程序接口,就能宣稱它支持這種方式的操作。
應(yīng)用工程解決方案:讀取編碼的字符數(shù)據(jù)
任何可以用“提供對(duì)將數(shù)據(jù)反復(fù)寫(xiě)入 OutputStream
的實(shí)體的讀訪問(wèn)權(quán)”表述的問(wèn)題,都可以用這一框架解決。在這一節(jié)和下一節(jié)中,我們將研究這樣的問(wèn)題示例及其有效的解決方案。
首先,考慮要讀取 UTF-8 編碼格式的字符流的情況: InputStreamReader
類讓您將以二進(jìn)制編碼的字符數(shù)據(jù)作為一系列 Unicode 字符讀?。凰硎玖藦淖止?jié)輸入流到字符輸入流的關(guān)口。 OutputStreamWriter
類讓您將一系列二進(jìn)制編碼格式的 Unicode 字符寫(xiě)入輸出流;它表示從字符輸出流到字節(jié)輸入流的關(guān)口。 String
類的 getBytes()
方法將字符串轉(zhuǎn)換成經(jīng)編碼的字節(jié)數(shù)組。然而,這些類中沒(méi)有一個(gè)能直接讓您讀取 UTF-8 編碼格式的字符流。
清單 20 到 24 中的代碼演示了以與 IOStreamEngine
類極其相似的方式使用 OutputEngine
框架的一種解決方案。我們并不是從輸入流讀取和通過(guò)輸出流過(guò)濾器進(jìn)行寫(xiě)操作,而是從字符流讀取,并通過(guò)所選的字符進(jìn)行編碼的 OutputStreamWriter
進(jìn)行寫(xiě)操作。
清單 20. 讀取編碼的字符數(shù)據(jù)
package org.merlin.io;
import java.io.*;
/**
* An output engine that copies data from a Reader through
* a OutputStreamWriter to the target OutputStream.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*/
public class ReaderWriterEngine implements OutputEngine {
private static final int DEFAULT_BUFFER_SIZE = 8192;
private Reader reader;
private String encoding;
private char[] buffer;
private Writer writer;
|
該類的構(gòu)造器接受要從中讀取的字符流、要使用的編碼以及可選的緩沖區(qū)大小。
清單 21. 構(gòu)造器
public ReaderWriterEngine (Reader in, String encoding) {
this (in, encoding, DEFAULT_BUFFER_SIZE);
}
public ReaderWriterEngine
(Reader reader, String encoding, int bufferSize) {
this.reader = reader;
this.encoding = encoding;
buffer = new char[bufferSize];
}
|
當(dāng)該引擎初始化時(shí),它將以所選編碼格式寫(xiě)字符的 OutputStreamWriter
連接至提供的輸出流。
清單 22. 初始化輸出流寫(xiě)程序
public void initialize (OutputStream out) throws IOException {
if (writer != null) {
throw new IOException ("Already initialised");
} else {
writer = new OutputStreamWriter (out, encoding);
}
}
|
當(dāng)執(zhí)行該引擎時(shí),它從輸入字符流中讀取數(shù)據(jù),然后將它們寫(xiě)入 OutputStreamWriter
,接著 OutputStreamWriter 將它們以所選的編碼格式傳遞給相連的輸出流。至此,該框架使數(shù)據(jù)可供讀取。
清單 23. 讀取數(shù)據(jù)
public void execute () throws IOException {
if (writer == null) {
throw new IOException ("Not yet initialised");
} else {
int amount = reader.read (buffer);
if (amount < 0) {
writer.close ();
} else {
writer.write (buffer, 0, amount);
}
}
}
|
當(dāng)引擎執(zhí)行完時(shí),它關(guān)閉其輸入。
清單 24. 關(guān)閉輸入
public void finish () throws IOException {
reader.close ();
}
}
|
在這種與壓縮不同的情況中,Java I/O 包不提供對(duì) OutputStreamWriter
之下的字符編碼類的低級(jí)別訪問(wèn)。因此,這是在 Java 平臺(tái) 1.4 之前的發(fā)行版上讀取編碼格式的字符流的唯一有效解決方案。從版本 1.4 開(kāi)始, java.nio.charset
包確實(shí)提供了與流無(wú)關(guān)的字符編碼和譯碼能力。然而,這個(gè)包不能滿足我們對(duì)基于輸入流的解決方案的要求。
應(yīng)用工程解決方案:讀取序列化的 DOM 文檔
最后,讓我們研究該框架的最后一種用法。清單 25 到 29 中的代碼提供了一個(gè)用來(lái)讀取序列化格式的 DOM 文檔或文檔子集的解決方案。這一代碼的潛在用途可能是對(duì)部分 DOM 文檔執(zhí)行確認(rèn)性重新解析。
清單 25. 讀取序列化的 DOM 文檔
package org.merlin.io;
import java.io.*;
import java.util.*;
import org.w3c.dom.*;
import org.w3c.dom.traversal.*;
/**
* An output engine that serializes a DOM tree using a specified
* character encoding to the target OutputStream.
*
* @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*/
public class DOMSerializerEngine implements OutputEngine {
private NodeIterator iterator;
private String encoding;
private OutputStreamWriter writer;
|
構(gòu)造器獲取要在上面進(jìn)行循環(huán)的 DOM 節(jié)點(diǎn),或預(yù)先構(gòu)造的節(jié)點(diǎn)迭代器(這是 DOM 2 的一部分),以及一個(gè)用于序列化格式的編碼。
清單 26. 構(gòu)造器
public DOMSerializerEngine (Node root) {
this (root, "UTF-8");
}
public DOMSerializerEngine (Node root, String encoding) {
this (getIterator (root), encoding);
}
private static NodeIterator getIterator (Node node) {
DocumentTraversal dt= (DocumentTraversal)
(node.getNodeType () ==
Node.DOCUMENT_NODE) ? node : node.getOwnerDocument ();
return dt.createNodeIterator (node, NodeFilter.SHOW_ALL, null, false);
}
public DOMSerializerEngine (NodeIterator iterator, String encoding) {
this.iterator = iterator;
this.encoding = encoding;
}
|
初始化期間,該引擎將適當(dāng)?shù)?OutputStreamWriter
連接至目標(biāo)輸出流。
清單 27. initialize() 方法
public void initialize (OutputStream out) throws IOException {
if (writer != null) {
throw new IOException ("Already initialised");
} else {
writer = new OutputStreamWriter (out, encoding);
}
}
|
在執(zhí)行階段,該引擎從節(jié)點(diǎn)迭代器中獲得下一個(gè)節(jié)點(diǎn),然后將其序列化至 OutputStreamWriter
。當(dāng)獲取了所有節(jié)點(diǎn)后,引擎關(guān)閉它的流。
清單 28. execute() 方法
public void execute () throws IOException {
if (writer == null) {
throw new IOException ("Not yet initialised");
} else {
Node node = iterator.nextNode ();
closeElements (node);
if (node == null) {
writer.close ();
} else {
writeNode (node);
writer.flush ();
}
}
}
|
當(dāng)該引擎關(guān)閉時(shí),沒(méi)有要釋放的資源。
清單 29. 關(guān)閉
public void finish () throws IOException {
}
// private void closeElements (Node node) throws IOException ...
// private void writeNode (Node node) throws IOException ...
}
|
序列化每個(gè)節(jié)點(diǎn)的其它內(nèi)部細(xì)節(jié)不太有趣;這一過(guò)程主要涉及根據(jù)節(jié)點(diǎn)的類型和 XML 1.0 規(guī)范寫(xiě)出節(jié)點(diǎn),所以我將在本文中省略這一部分的代碼。請(qǐng)參閱附帶的 源代碼,獲取完整的詳細(xì)信息。
結(jié)束語(yǔ)
我所提供的是一個(gè)有用的框架,它利用標(biāo)準(zhǔn)輸入流 API 讓您能有效讀取由只能寫(xiě)入輸出流的系統(tǒng)產(chǎn)生的數(shù)據(jù)。它讓我們讀取經(jīng)壓縮或編碼的數(shù)據(jù)及序列化文檔等。雖然可以使用標(biāo)準(zhǔn) Java API 實(shí)現(xiàn)這一功能,但使用這些類的效率根本不行。應(yīng)該充分注意到,這種解決方案比最簡(jiǎn)單的蠻力解決方案更有效(即使在數(shù)據(jù)不大的情況下)。將數(shù)據(jù)寫(xiě)入 ByteArrayOutputStream
以便進(jìn)行后續(xù)處理的任何應(yīng)用程序都可能從這一框架中受益。
字節(jié)數(shù)組流的拙劣性能和管道式流難以置信的蹩腳性能,實(shí)際上都是我下一篇文章的主題。在那篇文章中,我將研究重新實(shí)現(xiàn)這些類,并比這些類的原創(chuàng)者更加關(guān)注它們的性能。只要 API 約定稍微寬松一點(diǎn),性能就可能改進(jìn)一百倍了。
我討厭洗碗。不過(guò),正如大多數(shù)我自認(rèn)為是較好(雖然常常還是微不足道)的想法一樣,這些類背后的想法都是在我洗碗時(shí)冒出來(lái)的。我時(shí)常發(fā)現(xiàn)撇開(kāi)實(shí)際代碼,回頭看看并且把問(wèn)題的范圍考慮得更廣些,可能會(huì)得出一個(gè)更好的解決方案,它最終為您提供的方法可能比您找出的容易方法更好。這些解決方案常常會(huì)產(chǎn)生更清晰、更有效而且更可維護(hù)的代碼。
我真的擔(dān)心我們有了洗碗機(jī)的那一天。
參考資料
關(guān)于作者
 |
|
 |
Merlin 是 Baltimore Technologies的密碼專家和首席技術(shù)傳道者,該公司是一家愛(ài)爾蘭人開(kāi)辦的電子安全性公司,他還是兼職作家、業(yè)余門(mén)房和“洗碗工”;他不會(huì)因 JDK 1.4 而煩惱。他住在紐約州紐約市(一個(gè)非常美的城市,人們給它取了兩次名字),可以通過(guò) merlin@merlin.org與他聯(lián)系。
|