fastdfs-javaapi连接池

2020-04-17 05:21

Fastdfs-javaapi-连接池

1 简绍

大家都知道fastdfs分为tracker server和storage server, tracker server是跟踪服务器,主要做调度工作,在访问上起负载均衡的作用 。storage server是存储服务器,主要负责文件的存储。 我们使用java api在分布式文件系统的文件去上传、修改、删除等操作时,有以下几步:

1) 申请与tracker server的连接

TrackerClient trackerClient = new TrackerClient();

TrackerServer trackerServer= trackerClient.getConnection(); 2) 通过trackerServer得到与storage server的连接的客户端

StorageServer ss = tc.getStoreStorage(ts);

StorageClient1 client1 = new StorageClient1(trackerServer, ss); 3) 上传文件

client1.upload_file1(fileBuff, fileExtName, null); 4) 关闭连接

ss.close();

trackerServer.close();

最终可以完成我们想要的操作,但是这两次创建的连接都是tcp/ip连接,如果每次创建完连接我们都再关闭连接。这个过程是很耗时的。

通过api申请的连接并不支持高发发(即一个storage连接只能上传一个文件),我们必须保证同一时刻一个连接上传一个文件。

由于我们用fastdfs做文件服务器,通过web的管理平台将文件上传至分布式文件系统中,每次申请完连接再关闭连接,对我们来说会延长上传文件的时间,而且上传文件高并发下,申请的连接可能突然增至几百个,这样我们的服务器的性能损耗太大了。

2 java api 源代码分析

通过查看fastdfs java api的源代码了解到。通过trackerServer和

new

StorageClient1(trackerServer,

storageServer

storageServer);client对象,在操作文件时,会自动检查trackerServer和storageServer是否为空,如果为空,程序会自动为任一server创建连接,待操作完成后,将创建的连接关闭。

如果storageServer为null,则程序自动创建trackerServer,根据trackerServer得到storageServer,并返回storageServer,在返回storageServer之前会关闭trackerServer。通过storageServer上传完文件之后,关闭storageServer. 如果都不空null,则api中不会关闭连接。

3 结合实际。

由于我们搭建的文件系统架构是,一个tracker和三个storage.

St1 St1 tracker St1 从这个图形可以看出,我们的tracker连接是没有必要改变的,申请一次就可以,(直接点)故我们可以为tracker创建连接池。 连接池代码如下ConnectionPool:

import java.io.IOException;

import java.net.InetSocketAddress;

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit;

import org.csource.fastdfs.ClientGlobal; import org.csource.fastdfs.StorageClient1; import org.csource.fastdfs.StorageServer; import org.csource.fastdfs.TrackerClient; import org.csource.fastdfs.TrackerGroup; import org.csource.fastdfs.TrackerServer;

public class ConnectionPool {

// class method // singleton

private Object obj = new Object();

private final static String tgStr = \; private final static int port = 22122; // the limit of connection instance private int size = 5;

// busy connection instances

private ConcurrentHashMap // idle connection instances

private ArrayBlockingQueue idleConnectionPool =

busyConnectionPool = null;

null;

private ConnectionPool() { };

private static ConnectionPool instance = new ConnectionPool(); // get the connection pool instance

public static ConnectionPool getPoolInstance() { }

// class method

// init the connection pool private void init(int size) { }

initClientGlobal();

TrackerServer trackerServer = null; try { }

TrackerClient trackerClient = new TrackerClient(); //Only tracker

trackerServer = trackerClient.getConnection(); for (int i = 0; i < size; i++) { }

e.printStackTrace(); if(trackerServer!=null){ }

try { }

trackerServer.close(); e.printStackTrace(); } catch (IOException e) {

StorageServer storageServer = null; StorageClient1 client1 = new

storageServer);

return instance;

busyConnectionPool = new ConcurrentHashMap

Object>();

ArrayBlockingQueue(size);

StorageClient1(trackerServer,

idleConnectionPool.add(client1);

} catch (IOException e) { }finally{

// so if the connection was broken due to some erros (like

// : socket init failure, network broken etc), drop this connection // from the busyConnectionPool, and init one new connection. public void drop(StorageClient1 client1) {

if (busyConnectionPool.remove(client1)!=null) {

TrackerServer trackerServer = null; try {

TrackerClient trackerClient = new TrackerClient(); //TODO 此处有内存泄露,因为trackerServer没有关闭连接 trackerServer = trackerClient.getConnection();

StorageServer storageServer = null; StorageClient1 newClient1 = new

idleConnectionPool.add(newClient1); e.printStackTrace();

// 1. pop the connection from busyConnectionPool; // 2. push the connection into idleConnectionPool; // 3. do nessary cleanup works.

public void checkin(StorageClient1 client1) { }

if (busyConnectionPool.remove(client1)!=null) { }

idleConnectionPool.add(client1);

// 1. pop one connection from the idleConnectionPool, // 2. push the connection into busyConnectionPool; // 3. return the connection

// 4. if no idle connection, do wait for wait_time seconds, and check public StorageClient1 checkout(int waitTimes) throws }

StorageClient1 client1 = idleConnectionPool.poll(waitTimes,

TimeUnit.SECONDS);

busyConnectionPool.put(client1, obj); return client1;

again

InterruptedException {

StorageClient1(trackerServer, storageServer);

} catch (IOException e) { }finally{

}

}

}

}

if(trackerServer!=null){ }

try { }

trackerServer.close(); e.printStackTrace(); } catch (IOException e) {

private void initClientGlobal() { }

InetSocketAddress[] trackerServers = new InetSocketAddress[1]; trackerServers[0] = new InetSocketAddress(tgStr, port); ClientGlobal.setG_tracker_group(new // 连接超时的时限,单位为毫秒

ClientGlobal.setG_connect_timeout(2000); // 网络超时的时限,单位为毫秒

ClientGlobal.setG_network_timeout(30000); ClientGlobal.setG_anti_steal_token(false); // 字符集

ClientGlobal.setG_charset(\); ClientGlobal.setG_secret_key(null);

TrackerGroup(trackerServers));

上传文件类接口:ImageServer

import java.io.File;

import java.io.IOException; /**

* 图片文件上传

* @author zhanghua * */


fastdfs-javaapi连接池.doc 将本文的Word文档下载到电脑 下载失败或者文档不完整,请联系客服人员解决!

下一篇:A2O工艺污水处理厂课程设计

相关阅读
本类排行
× 注册会员免费下载(下载后可以自由复制和排版)

马上注册会员

注:下载文档有可能“只有目录或者内容不全”等情况,请下载之前注意辨别,如果您已付费且无法下载或内容有问题,请联系我们协助你处理。
微信: QQ: