Fastdfs-javaapi-连接池
1 简绍
大家都知道fastdfs分为tracker server和storage server, tracker server是跟踪服务器,主要做调度工作,在访问上起负载均衡的作用 。storage server是存储服务器,主要负责文件的存储。 我们使用java api在分布式文件系统的文件去上传、修改、删除等操作时,有以下几步:
1) 申请与tracker server的连接
TrackerClient trackerClient = new TrackerClient();
TrackerServer trackerServer= trackerClient.getConnection(); 2) 通过trackerServer得到与storage server的连接的客户端
StorageServer ss = tc.getStoreStorage(ts);
StorageClient1 client1 = new StorageClient1(trackerServer, ss); 3) 上传文件
client1.upload_file1(fileBuff, fileExtName, null); 4) 关闭连接
ss.close();
trackerServer.close();
最终可以完成我们想要的操作,但是这两次创建的连接都是tcp/ip连接,如果每次创建完连接我们都再关闭连接。这个过程是很耗时的。
通过api申请的连接并不支持高发发(即一个storage连接只能上传一个文件),我们必须保证同一时刻一个连接上传一个文件。
由于我们用fastdfs做文件服务器,通过web的管理平台将文件上传至分布式文件系统中,每次申请完连接再关闭连接,对我们来说会延长上传文件的时间,而且上传文件高并发下,申请的连接可能突然增至几百个,这样我们的服务器的性能损耗太大了。
2 java api 源代码分析
通过查看fastdfs java api的源代码了解到。通过trackerServer和
得
到
的
new
StorageClient1(trackerServer,
storageServer
storageServer);client对象,在操作文件时,会自动检查trackerServer和storageServer是否为空,如果为空,程序会自动为任一server创建连接,待操作完成后,将创建的连接关闭。
如果storageServer为null,则程序自动创建trackerServer,根据trackerServer得到storageServer,并返回storageServer,在返回storageServer之前会关闭trackerServer。通过storageServer上传完文件之后,关闭storageServer. 如果都不空null,则api中不会关闭连接。
3 结合实际。
由于我们搭建的文件系统架构是,一个tracker和三个storage.
St1 St1 tracker St1 从这个图形可以看出,我们的tracker连接是没有必要改变的,申请一次就可以,(直接点)故我们可以为tracker创建连接池。 连接池代码如下ConnectionPool:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit;
import org.csource.fastdfs.ClientGlobal; import org.csource.fastdfs.StorageClient1; import org.csource.fastdfs.StorageServer; import org.csource.fastdfs.TrackerClient; import org.csource.fastdfs.TrackerGroup; import org.csource.fastdfs.TrackerServer;
public class ConnectionPool {
// class method // singleton
private Object obj = new Object();
private final static String tgStr = \; private final static int port = 22122; // the limit of connection instance private int size = 5;
// busy connection instances
private ConcurrentHashMap
private ArrayBlockingQueue
busyConnectionPool = null;
null;
private ConnectionPool() { };
private static ConnectionPool instance = new ConnectionPool(); // get the connection pool instance
public static ConnectionPool getPoolInstance() { }
// class method
// init the connection pool private void init(int size) { }
initClientGlobal();
TrackerServer trackerServer = null; try { }
TrackerClient trackerClient = new TrackerClient(); //Only tracker
trackerServer = trackerClient.getConnection(); for (int i = 0; i < size; i++) { }
e.printStackTrace(); if(trackerServer!=null){ }
try { }
trackerServer.close(); e.printStackTrace(); } catch (IOException e) {
StorageServer storageServer = null; StorageClient1 client1 = new
storageServer);
return instance;
busyConnectionPool = new ConcurrentHashMap Object>(); ArrayBlockingQueue StorageClient1(trackerServer, idleConnectionPool.add(client1); } catch (IOException e) { }finally{ // so if the connection was broken due to some erros (like // : socket init failure, network broken etc), drop this connection // from the busyConnectionPool, and init one new connection. public void drop(StorageClient1 client1) { if (busyConnectionPool.remove(client1)!=null) { TrackerServer trackerServer = null; try { TrackerClient trackerClient = new TrackerClient(); //TODO 此处有内存泄露,因为trackerServer没有关闭连接 trackerServer = trackerClient.getConnection(); StorageServer storageServer = null; StorageClient1 newClient1 = new idleConnectionPool.add(newClient1); e.printStackTrace(); // 1. pop the connection from busyConnectionPool; // 2. push the connection into idleConnectionPool; // 3. do nessary cleanup works. public void checkin(StorageClient1 client1) { } if (busyConnectionPool.remove(client1)!=null) { } idleConnectionPool.add(client1); // 1. pop one connection from the idleConnectionPool, // 2. push the connection into busyConnectionPool; // 3. return the connection // 4. if no idle connection, do wait for wait_time seconds, and check public StorageClient1 checkout(int waitTimes) throws } StorageClient1 client1 = idleConnectionPool.poll(waitTimes, TimeUnit.SECONDS); busyConnectionPool.put(client1, obj); return client1; again InterruptedException { StorageClient1(trackerServer, storageServer); } catch (IOException e) { }finally{ } } } } if(trackerServer!=null){ } try { } trackerServer.close(); e.printStackTrace(); } catch (IOException e) { private void initClientGlobal() { } InetSocketAddress[] trackerServers = new InetSocketAddress[1]; trackerServers[0] = new InetSocketAddress(tgStr, port); ClientGlobal.setG_tracker_group(new // 连接超时的时限,单位为毫秒 ClientGlobal.setG_connect_timeout(2000); // 网络超时的时限,单位为毫秒 ClientGlobal.setG_network_timeout(30000); ClientGlobal.setG_anti_steal_token(false); // 字符集 ClientGlobal.setG_charset(\); ClientGlobal.setG_secret_key(null); TrackerGroup(trackerServers)); 上传文件类接口:ImageServer import java.io.File; import java.io.IOException; /** * 图片文件上传 * @author zhanghua * */