Jav IO/NIO 和 okio

Java IO

java IO类的综述:

  • Basic:基础
  • Arrays:数组操作流
  • Files:文件操作流
  • Pipes:管道流,线程内部通信
  • Buffering:带缓存的流,用于装饰的类
  • Filtering:过滤
  • Parsing:解析
  • Strings:字符
  • Data:数据
  • Data-Formatted:有格式的数据
  • Objects:对象
  • Utilities:工具类的流

装饰者模式

其中ConcreteComponent这个模块可以没有的,它其实不算做是装饰者,它是对Component的具体实现的一种情况。
实例:

DataInputStream dis = new DataInputStream(
               new BufferedInputStream(
               new FileInputStream("test.txt")
              ));

Java NIO

核心内容:

  • Buffer
  • Channel
  • Selector

Buffer:缓存区

在buffer中比较重要的是对对缓冲区内部状态的变化跟踪的3个指针参数:

  • position:指定了下一个将要被写入或者读取的元素索引,它的值由get()/put()方法自动更新,在新创建一个Buffer对象时,position被初始化为0。调用flip()clear()rewind()都会影响到它的值。
  • limit:指定还有多少数据需要取出(在从缓冲区写入通道时),或者还有多少空间可以放入数据(在从通道读入缓冲区时)。它是实际可操作的数据的大小。
  • capacity:指定了可以存储在缓冲区中的最大数据容量。

三者的关系:0 <= position <= limit <= capacity

缓存区的操作:

  • 分配缓存区
// 分配指定大小的缓冲区  
ByteBuffer buffer1 = ByteBuffer.allocate(10);  

// 包装一个现有的数组  
byte array[] = new byte[10];  
ByteBuffer buffer2 = ByteBuffer.wrap( array );
  • 缓冲区分片
    根据现有的缓冲区对象来创建一个子缓冲区,即在现有缓冲区上切出一片来作为一个新的缓冲区,但现有的缓冲区与创建的子缓冲区在底层数组层面上是数据共享的,也就是说,子缓冲区相当于是现有缓冲区的一个视图窗口。
    // 创建子缓冲区  
    ByteBuffer slice = buffer.slice();
    
  • 只读缓冲区
    只读缓冲区非常简单,可以读取它们,但是不能向它们写入数据。可以通过调用缓冲区的asReadOnlyBuffer()方法,将任何常规缓冲区转 换为只读缓冲区,这个方法返回一个与原缓冲区完全相同的缓冲区,并与原缓冲区共享数据,只不过它是只读的。如果原缓冲区的内容发生了变化,只读缓冲区的内容也随之发生变化。
    // 创建只读缓冲区  
    ByteBuffer readonly = buffer.asReadOnlyBuffer();
    
  • 直接缓冲区
    直接缓冲区是为加快I/O速度,使用一种特殊方式为其分配内存的缓冲区,JDK文档中的描述为:给定一个直接字节缓冲区,Java虚拟机将尽最大努 力直接对它执行本机I/O操作。也就是说,它会在每一次调用底层操作系统的本机I/O操作之前(或之后),尝试避免将缓冲区的内容拷贝到一个中间缓冲区中 或者从一个中间缓冲区中拷贝数据。要分配直接缓冲区,需要调用allocateDirect()方法,而不是allocate()方法,使用方式与普通缓冲区并无区别。
// 使用allocateDirect,而不是allocate  
ByteBuffer buffer = ByteBuffer.allocateDirect( 1024 );
  • 内存映射文件I/O
    就是MappedByteBuffer类。内存映射文件I/O是一种读和写文件数据的方法,它可以比常规的基于流或者基于通道的I/O快的多。

Channel:通道

通道是一个对象,通过它可以读取和写入数据,当然了所有数据都通过Buffer对象来处理。我们永远不会将字节直接写入通道中,相反是将数据写入包含一个或者多个字节的缓冲区。同样不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。

Channel是对数据的源头和数据目标点流经途径的抽象。

  • FileChannel:文件渠道,从文件中读写数据
  • DatagramChannel:UDP渠道,通过UDP读写网络数据
  • SocketChannel:TCP渠道,通过TCP读写网络中的数据
  • ServerSocketChannel:监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。
scatter:分散

指在读操作时将读取的数据写入多个buffer中。Channel将从Channel中读取的数据“分散(scatter)”到多个Buffer中。

ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.read(bufferArray);
gather:聚集

指在写操作时将多个buffer的数据写入同一个Channel。Channel 将多个Buffer中的数据“聚集(gather)”后发送到Channel。

ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);
//write data into buffers
ByteBuffer[] bufferArray = { header, body };
channel.write(bufferArray);

Selector:选择器

Nio Socket Example

服务端代码:

/*
 * Copyright (C) 2016 Baidu, Inc. All Rights Reserved.
 */
package com.pan.learn.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
 * socket server
 *
 * Created by panhongchao on 16/4/17.
 */
public class SocketServerExample {
    private Selector selector;
    private Map<SocketChannel, List> dataMapper;
    private InetSocketAddress listenerAddress;

    public static void main(String[] args) {
        Runnable server = new Runnable() {
            @Override
            public void run() {
                try {
                    new SocketServerExample("localhost", 8090).startServer();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        };


        Runnable client = new Runnable() {
            @Override
            public void run() {
                try {
                    new SocketClientExample().startClient();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        new Thread(server).start();
        new Thread(client, "client-A").start();
        new Thread(client, "client-B").start();
    }

    public SocketServerExample(String address, int port) {
        listenerAddress = new InetSocketAddress(address, port);
        dataMapper = new Hashtable<>();
    }

    private void startServer() throws IOException {
        selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(listenerAddress);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("Server started...");

        while (true) {
            selector.select();
            Iterator keys = selector.selectedKeys().iterator();
            while (keys.hasNext()) {
                SelectionKey key = (SelectionKey) keys.next();
                keys.remove();

                if (!key.isValid()) {
                    continue;
                }

                if (key.isAcceptable()) {
                    accept(key);
                } else if (key.isReadable()) {
                    read(key);
                } else if (key.isConnectable()) {
                    System.out.println("key is connectable...");
                } else if (key.isValid()) {
                    System.out.println("key is valid...");
                }
            }
        }
    }

    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        SocketChannel channel = serverSocketChannel.accept();
        channel.configureBlocking(false);
        Socket socket = channel.socket();
        SocketAddress remoteAddr = socket.getRemoteSocketAddress();

        System.out.println("Connected to: " + remoteAddr);

        dataMapper.put(channel, new ArrayList());
        channel.register(this.selector, SelectionKey.OP_READ);
    }

    private void read(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int numRead = -1;
        numRead = channel.read(buffer);

        if (numRead == -1) {
//            dataMapper.remove(channel);
            Socket socket = channel.socket();
            SocketAddress remoteAddr = socket.getRemoteSocketAddress();

            System.out.println("Connection closed by client: " + remoteAddr);

            channel.close();
            key.cancel();
            return;
        }

        byte[] data = new byte[numRead];
        System.arraycopy(buffer.array(), 0, data, 0, numRead);
        System.out.println("Got: " + new String(data));
    }

}

客户端代码:

/*
 * Copyright (C) 2016 Baidu, Inc. All Rights Reserved.
 */
package com.pan.learn.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * socket client
 *
 * Created by panhongchao on 16/4/17.
 */
public class SocketClientExample {
    public void startClient() throws IOException, InterruptedException {

        InetSocketAddress hostAddress = new InetSocketAddress("localhost", 8090);
        SocketChannel client = SocketChannel.open(hostAddress);

        System.out.println("Client... started");

        String threadName = Thread.currentThread().getName();

        // Send messages to server
        String[] messages = new String[] {threadName + ": test1", threadName + ": test2", threadName + ": test3"};

        for (int i = 0; i < messages.length; i++) {
            byte[] message = messages[i].getBytes();
            ByteBuffer buffer = ByteBuffer.wrap(message);
            client.write(buffer);
            System.out.println(messages[i]);
            buffer.clear();
            Thread.sleep(5000);
        }
        client.close();
    }

}

流 程:

results matching ""

    No results matching ""