缘起

最近的项目使用activemq比较多,遇到了点坑,总结一下。用spring提供的jmsTemplate,包装一下jms的原始接口,使得消息发送代码很简洁;同时DefaultMessageListenerContainer,对消费消息也做了一定的包装,可以配置消费者数量的上下限。

pom依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring-framework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring-framework.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>

消息发送:jmsTemplate

如果消费端需要给多方消费使用,那么最好采用VirtualTopic的模式,具体方法就是在发送端将队列名字命名VirtualTopic.xxxx,同时把队列设置成topic;在消费端,以queue的形式消费,队列名字设置为Consumer.xx.VirtualTopic.xxxx,这种订阅消费是持久订阅,所以一旦消费,不用担心中途停止而丢失消息,不过要注意消息积压。

1.ActiveMQConnectionFactory

  • spring配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-4.2.xsd">
    <bean id="idolScoreMqPropertyConfig"
    class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="fileEncoding" value="UTF-8" />
    <property name="locations">
    <list>
    <value>classpath*:properties/acmq-${profiles.active}.properties</value>
    </list>
    </property>
    </bean>
    <!-- ActiveMQ Topic -->
    <bean id="idolScoreDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg index="0">
    <value>${activemq.idolscore.queuename}</value>
    </constructor-arg>
    </bean>
    <!-- ActiveMQ 连接工厂 -->
    <bean id="idolScoreConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL">
    <value>${activemq.idolscore.server}</value>
    </property>
    <property name="userName">
    <value>${activemq.idolscore.user}</value>
    </property>
    <property name="password">
    <value>${activemq.idolscore.password}</value>
    </property>
    </bean>
    <!-- jms queue template -->
    <bean id="jmsTemplate"
    class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="idolScoreConnectionFactory" />
    <property name="defaultDestination" ref="idolScoreDestination" />
    </bean>
    <bean id="bombIdolScoreMsgProducer"
    class="selflearning.mq.BombIdolScoreMsgProducer">
    <property name="jmsTemplate" ref="jmsTemplate"/>
    </bean>
    </beans>
  • 消息发送代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    public class BombIdolScoreMsgProducer {
    private static BombIdolScoreMsgProducer producer = null;
    private JmsTemplate jmsTemplate;
    private BombIdolScoreMsgProducer() {}
    public static BombIdolScoreMsgProducer instance() {
    if (producer == null) {
    synchronized (BombIdolScoreMsgProducer.class) {
    if (producer == null) {
    ApplicationContext context = new ClassPathXmlApplicationContext(
    new String[] {"classpath*:spring/idolscore-mq.xml"});
    producer = context.getBean(BombIdolScoreMsgProducer.class);
    }
    }
    }
    return producer;
    }
    public void send(final BombIdolScoreMessage message) {
    jmsTemplate.send(new MessageCreator() {
    @Override
    public Message createMessage(Session session) throws JMSException {
    return session.createTextMessage(message.toString());
    }
    });
    }
    public void setJmsTemplate(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
    }
    }

消息发送如果按照上面的方式,完全可以工作,但是效率不高,每次发送一条消息需要新建connection、session和producer,发完之后,还需要关闭,这个过程需要请求mq broker7次。

JMS is designed for high performance. In particular its design is such that you are meant to create a number of objects up front on the startup of your application and then resuse them throughout your application. e.g. its a good idea to create upfront and then reuse the following

  • Connection
  • Session
  • MessageProducer
  • MessageConsumer
    The reason is that each create & destroy of the above objects typically requires an individual request & response with the JMS broker to ensure it worked. e.g. creating a connection, session, producer, then sending a message, then closing everything down again - could result in 7 request-responses with the server!

#

2.PooledConnectionFactory

解决上面低效问题的方法就是池化连接,需要引入新的依赖,实测效果显示池化之后确实提高好几倍。

1
2
3
4
5
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.14.1</version>
</dependency>

spring配置修改,可以设置最大连接数,最大active session功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!-- a pooling based ActiveMQ 连接工厂 -->
<bean id="idolScoreConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="maxConnections" value="3"/>
<property name="maximumActiveSessionPerConnection" value="10"/>
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>${activemq.idolscore.server}</value>
</property>
<property name="userName">
<value>${activemq.idolscore.user}</value>
</property>
<property name="password">
<value>${activemq.idolscore.password}</value>
</property>
</bean>
</property>
</bean>

  • 遇到的问题1:Initialization of bean failed; nested exception is java.lang.reflect.MalformedParameterizedTypeException
    这是因为pom里依赖了commons-pool和commons-pool2,将activemq-pool版本升到5.14就都依赖commons-pool2了,这样就米有问题了。
  • 遇到的问题2:failover的两个broker同时收到消息
    解决方案很简单,将failover的url里randomize=false

消息接收:DefaultMessageListenerContainer

  • spring配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    <!-- 异步接收Queue消息Container -->
    <bean id="jmsTemplate"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="entityUpdateMessageListener"/>
    <!-- 初始5个Consumer, 可动态扩展到10 -->
    <property name="concurrentConsumers" value="5"/>
    <property name="maxConcurrentConsumers" value="10"/>
    <!-- 设置消息确认模式为Client -->
    <property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>
    <property name="autoStartup" value="true"/>
    </bean>
    <bean id="entityUpdateMessageListener"
    class="selflearning.controller.PopCircleMessageListener"/>
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg index="0">
    <value>${paopao.activemq.circle.queuename}</value>
    </constructor-arg>
    </bean>
  • 消息处理的代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public class PopCircleMessageListener implements MessageListener {
    public void onMessage(Message message) {
    if (message instanceof TextMessage) {
    TextMessage textMessage = (TextMessage) message;
    try {
    String msgJson = textMessage.getText();
    System.out.println("----> Raw msg: " + msgJson);
    } catch (JMSException e) {
    e.printStackTrace();
    }
    }
    }
    }

reference

  1. http://elim.iteye.com/blog/1893038
  2. http://activemq.apache.org/spring-support.html
  3. https://codedependents.com/2009/10/16/efficient-lightweight-jms-with-spring-and-activemq/
  4. http://wiki.qiyi.domain/pages/viewpage.action?pageId=6817067#ActiveMQ开发者手册-FAQ

缘起

最近遇到个问题,我需要处理上游生成的很多数据文件,文件统一存放在hdfs上,所以方便用mr处理,但是单个文件的size差别很大,有几m的,也有几g对。我的整个处理流程分两步,第一步预处理文件,因为上游任务会陆续生产文件,所以单独起mr分别预处理;第二步,把预处理的文件全部merge在一起,预处理的文件我都是采用gz压缩,所以每个文件会启一个map,导致两个问题,1)大文件会拖慢整个job的节奏,2)小文件处理启动的map数量过多。

sequence file

解决方案,就是预处理结果采用sequencefile格式存储,因为sequencefile格式的文件支持大文件split成多个map执行,多个小文件合并成一个map任务执行。用sequencefile,需要注意一个问题,就是下游使用这些文件的map任务的key类型,要和上游输出的key类型一致,例如我们例子中的LongWritable类型;另外,如果mr的sequencefile输出要导入到hive中使用,那么hive就会自动忽略key字段,一般的解决方案,就是输出一个new Text(“”)的key,然后需要的字段学到value中。

代码

  1. 预处理mr代码,需要setOutputFormatClass为SequenceFileOutputFormat.class,另外注意这里设置OutputKeyClass为LongWritable,下游任务的map key需要保持一致。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class CookAsPb extends Configured implements Tool {
    public int run(String[] strings) throws Exception {
    Configuration conf = getConf();
    conf.set("mapreduce.map.output.compress", "true");
    ...
    Job job = Job.getInstance(conf);
    job.setJarByClass(getClass());
    job.setMapperClass(CookAsPbMapper.class);
    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(Text.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    FileOutputFormat.setCompressOutput(job, true);
    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    ...
    return ret;
    }
    public static void main(String[] args) throws Exception {
    int ret = ToolRunner.run(new Configuration(), new CookAsPb(), args);
    System.exit(ret);
    }
    }
  2. mereg任务代码,setInputFormatClass为SequenceFileOutputFormat.class,mapper代码的input key为LongWritable。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class MergePbs extends Configured implements Tool {
    public int run(String[] strings) throws Exception {
    ...
    job.setMapperClass(MergePbsMapper.class);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    ...
    }
    }
    public class MergePbsMapper extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    ...
    }
    }

缘起

今年双十一在老婆的鼓励之下,入手了一个MacBook Air。我觉得苹果电脑好用的精髓就是,从此可以告别鼠标了,不论干啥事,都很舒坦。熟悉了几天之后,就萌生了把之前的hexo博客写作环境搬到Mac上来。于是乎就折腾了一番。

具体步骤


一.必备的工具

  1. node.js下的巨慢无比。
  2. git:xcode安装好就可以了。

二.hexo

  1. 安装完成hexo之后

    1
    2
    3
    4
    5
    6
    7
    $ npm install hexo-deployer-git --save
    $ mkdir blog
    $ hexo init blog
    $ cd blog
    $ rm -rf source
    $ git clone git@github.com:niaokedaoren/blog-src.git source
    $ git clone https://github.com/wuchong/jacman.git themes/jacman
  2. 拷贝source目录下备份的 _config.yml到相应目录下即可。

  3. 修改一下scaffolds的post.md
    1
    2
    3
    4
    5
    6
    ---
    title: {{ title }}
    date: {{ date }}
    categories:
    tags:
    ---

缘起

想把protobuf的message按字节数组输出,以前写到hbase里,很自然;但是现在需要写到hdfs上,突然就没法子了。google大法,发现有个base64的东西,问题一下迎刃而解。

  1. 输出字节数组:转成base64 string;
  2. 读取字节数组:从base64 string反解回来。

Dependencies

commons-codec,这个包比较好用,一堆thread safe的静态函数,就能搞定问题。

1
2
3
4
5
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>

原理

  1. 每3个8位二进制码位一组,转换为4个6位二进制码为一组(不足6位时地位补0)。3个8位二进制码和4个6位二进制码长度都是24位。
  2. 对获得的4个6位二进制码补位,每个6位二进制码添加两位高位0,组成4个8位二进制码。
  3. 将获得的4个8位二进制码转换为4个十进制码。
  4. 将获得的十进制码转换为Base64字符表中对应的字符,对照表

另外,当原文的二进制码长度不是24的倍数,最终转换为十进制时也不足4项,这时就需要用=补位。
RFC2045还规定每行位76个字符,每行末尾需添加一个回车换行符(\r\n),即便是最后一行不够76个字符,也要加换行符。

commons-codec 三种模式

  1. 标准的RFC2045,每76个字符加上(\r\n),有补位等号,即chunked的模式
  2. 有补位等号,但是不加(\r\n),即url unsafe模式
  3. Url Base64,也就是将“+”和“\”换成了“-”和“_”符号,且不适用补位,即url safe模式

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Base64Test {
private static String decode(String base64Str) {
return new String(Base64.decodeBase64(base64Str));
}
public static void main(String[] args) {
String text = "hello, Base64!";
String base64Text = Base64.encodeBase64String(text.getBytes());
String base64TextChunked = new String(Base64.encodeBase64Chunked(text.getBytes()));
String base64UrlSafeText = Base64.encodeBase64URLSafeString(text.getBytes());
System.out.println("text: " + text.length());
System.out.println("text byte: " + text.getBytes().length);
System.out.println("url-unsafe base64: " + base64Text.length());
System.out.println("chunked base64: " + base64TextChunked.length());
System.out.println("url-safe base64: " + base64UrlSafeText.length());
System.out.println(base64Text);
System.out.println(base64TextChunked);
System.out.println(base64UrlSafeText);
System.out.println(decode(base64Text));
System.out.println(decode(base64TextChunked));
System.out.println(decode(base64UrlSafeText));
}
}
/* output
text: 14
text byte: 14
url-unsafe base64: 20
chunked base64: 22
url-safe base64: 19
aGVsbG8sIEJhc2U2NCE=
aGVsbG8sIEJhc2U2NCE=
aGVsbG8sIEJhc2U2NCE
hello, Base64!
hello, Base64!
hello, Base64!
*/
  • 分析首三字母 hel –> aGVs
    二进制表示:
    h = 01101000
    e = 01100101
    l = 01101100
c1 c2 c3 c4
011010 000110 010101 101100
26 6 21 44
a G V s
  • 分析末尾两个字母 4!
    根据规则,3个原始byte一组转换,最后剩余的byte组成一组,
    二进制表示:
    4 = 00110100
    ! = 00100001
    每6个bit构成base64编码的一个字母,剩余4个bit后面需补充2个0bit,构成第三个字母
c1 c2 c3
001101 000010 000100
13 2 4
N C E

因为只有NCE三个字母,所以需要一个部位等号,凑足4个字母,如果按照标准的base64编码,还需要加上 \r\n

结合protobuf的应用

1
2
3
4
5
6
7
8
9
10
11
12
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
UserModel.Builder builder = UserModel.newBuilder();
for (Text entry : values) {
byte[] pbUserModel = Base64.decodeBase64(entry.toString());
builder.mergeFrom(pbUserModel);
}
UserModel model = builder.build();
String base64Str = Base64.encodeBase64URLSafeString(model.toByteArray());
context.write(key, new Text(base64Str));
}

利弊

优点
  1. 7 Bit ASCII characters are safe for transmission over the network and between different systems
  2. SMPT protocol for emails supports only 7 Bit ASCII Characters. Base64 is the commonly used technique to send binary files as attachments in emails.
缺点
  1. Base64 encoding bloats the size of the original binary stream by 33 percent
  2. Encoding/Decoding process consumes resources and can cause performance problems

reference:

  1. https://commons.apache.org/proper/commons-codec/apidocs/org/apache/commons/codec/binary/Base64.html
  2. http://aub.iteye.com/blog/1129273
  3. http://dev-faqs.blogspot.hk/2012/12/advantages-of-base-64-encoding.html

时间飞逝,转眼就是2016,由于2015下半年事情太多,就把读书这事给松懈了,现在捡起来继续。

平时用java io主要是读写文件,网络传输方面基本不涉及。用java io API给我的最大感受就是读写一下文件,为啥要new那么多类,一层包一层,好混乱。这几天,读了java编程思想的文件IO这一章,稍微弄明白其内在逻辑,写篇博文总结总结。本文相关类依赖图都出自深入分析 Java I/O 的工作机制

基于字节流的API: InputStream 和 OutputStream

java io的API是不断演进的,最开始只有支持byte流读写的API,其基类就是InputStream和OutputStream。简单看一下各个类的关系。
InputStream
OutputStream

都是基于字节的,和字符编码没有关系:

1
2
int read(byte[] b, int off, int len)
void write(byte[] b, int off, int len

输入输出的类关系图,很相似,就以OutputStream为例说道说道,直接子类有ByteArrayOutputStream,FileOutputStream,FilterOutputStream,ObjectOutputStream,PipedOutputStream,其中FilterOutputStream类比较特殊,这是一个装饰类,其构造函数是FilterOutputStream(OutputStream out),这是个典型的装饰模式,所以我们在使用相关API的时候,需要Stream对象一层层包起来,达到一个组合的效果,例如

1
new BufferedInputStream(new FileInputStream(filename))。

下面举个文件copy的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static void copy(String from, String to) throws IOException {
FileInputStream in = null;
FileOutputStream out = null;
try {
in = new FileInputStream(from);
out = new FileOutputStream(to);
byte[] buf = new byte[1024];
int n = 0;
while ((n = in.read(buf)) > 0) {
out.write(buf, 0, n);
}
} finally {
if (in != null) {
in.close();
}
if (out != null) {
out.close();
}
}
}

基于字符的API:Reader 和 Writer

可能是为了国际化,java后来推出了基于字符char的API:

1
2
int read(char[] cbuf, int off, int len)
void write(char[] cbuf, int off, int len)

Reader、Writer的子类关系图和OutputStream、InputStream很相似,都基于装饰模式。
Reader
Writer

适配器API:InputStreamReader 和 OutputStreamWriter

从基于byte的InputStream转到基于字符char的Reader,需要适配器InputStreamReader,如果字节编码格式不是系统默认的,还需要指定编码格式,例如UTF-8等,否则读入乱码,特别是中文。

1
InputStreamReader(InputStream in, String charsetName)

同样OutputStreamWriter,也充当着类似的角色。适配器模式和装饰模式类似,调用相关API,也需要一层层包起来,例如

1
new BufferedReader(new InputStreamReader(new FileInputStream(filename), "UTF-8"))

下面举个读取文件直接print的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static void display(String filename) throws IOException {
FileInputStream in = null;
BufferedReader bufReader = null;
try {
in = new FileInputStream(filename);
bufReader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
String line = null;
while ((line = bufReader.readLine()) != null) {
System.out.println(line);
}
} finally {
if (in != null) {
if (bufReader != null) {
bufReader.close();
} else {
in.close();
}
}
}
}

Overview

Collections UML
这一章主要讲java容器类,以及相关类的一些基本用法,看完之后对这些java容器之间关系有了一定的把握。总的来说,java容器分为两大类:

  • Collection
    该接口定义了线性容器,List、Set、Queue都是继承自它。
  • Map
    该接口定义了key-value对的容器。

同时Collections类里面定义了一堆静态函数,提供辅助功能,方便用户操作容器类,例如sort、二分查找、shuffle等功能。

Java容器类最大的特点就是接口定义和具体实现是解耦的,这个很方便第三方提供其它的优化实现,来替代java sdk中的实现,例如著名的guava库。

看书,除了温故,当然还有知新。我不喜欢泛泛总结,下面还是举些例子。

Comparable vs Comparator

以前也看过,但是老是映像不深,总觉的差不多,因为两个都是接口。现在谈谈本次看书的体会:

  • Comparable
    需要实现compareTo方法,这个接口像是一个特质(用scala里的trait解释,会更好),一个类实现了这个接口,就说明这个类的对象具有可比性这个特质,这样在调用Collections的sort方法时,就不需要额外提供其它的比较器。
  • Comparator
    需要实现compare方法,这个接口更像是一种策略模式,它是独立于需要比较的类;不同的实现可以给同一个类提供不同的比较策略,使用起来会更加灵活。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class User implements Comparable<User> {
final int id;
final String name;
User(int id, String name) {
this.id = id;
this.name = name;
}
public int compareTo(User o) {
if (o == null) {
return -1;
}
return this.id - o.id;
}
public String toString() {
return "(" + id + ", " + name + ")";
}
}

现在有User类,实现了Comparable接口,所以具有User类对象之间具有可比性,现在的实现是按照id来排序的。

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ComparableVsComparator {
private static List<User> generateTestData() {
List<User> users = new ArrayList<User>();
users.add(new User(2, "Jack"));
users.add(new User(1, "Tom"));
users.add(new User(3, "Lucy"));
return users;
}
private static void testComparable() {
List<User> users = ComparableVsComparator.generateTestData();
Collections.sort(users);
System.out.println(users);
}
public static void main(String[] args) {
System.out.println("Test comparable: ");
ComparableVsComparator.testComparable();
}
/* Output
Test comparable:
[(1, Tom), (2, Jack), (3, Lucy)]
*/
}

输出结果是按照user id来升序排列的,一切都显得很顺利,但是突然有需求,需要对user name进行字母序排序,这下Comparator可以拿出来一展身手了,原来的代码完全不用修改,只需要另外提供一个字母序比较的Comparator实现。

1
2
3
4
5
class UserComparatorViaName implements Comparator<User> {
public int compare(User o1, User o2) {
return o1.name.compareTo(o2.name);
}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...
private static void testComparator() {
List<User> users = ComparableVsComparator.generateTestData();
Collections.sort(users, new UserComparatorViaName());
System.out.println(users);
}
public static void main(String[] args) {
System.out.println("Test comparator: ");
ComparableVsComparator.testComparator();
}
/* Output
Test comparator:
[(2, Jack), (3, Lucy), (1, Tom)]
*/

Queue and Stack

因为List下的ArrayList、LinkedList,以及Set下的HashSet、Map下的HashMap平时都比较用的多,就不举例子了。下面就着重介绍一下Queue和Stack,官方API推荐使用双端队列接口Deque的一种实现ArrayDeque来模拟Queue和Stack的行为,官方文档中说ArrayDeque要比LinkedList速度要快,但同样是线程不安全的。

Resizable-array implementation of the Deque interface. Array deques have no capacity restrictions; they grow as necessary to support usage. They are not thread-safe; in the absence of external synchronization, they do not support concurrent access by multiple threads. Null elements are prohibited. This class is likely to be faster than Stack when used as a stack, and faster than LinkedList when used as a queue.

  • Queue

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public static void testQueue(int n) {
    Deque<String> queue = new ArrayDeque<String>();
    for (int i=0; i<n; i++) {
    queue.addLast(generator.next()); // push
    }
    System.out.println("Queue: " + queue);
    System.out.print("Pop order:");
    while (queue.isEmpty() == false) {
    String head = queue.removeFirst(); // pop
    System.out.print(" " + head);
    }
    System.out.println();
    }
  • Stack

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public static void testStack(int n) {
    Deque<String> stack = new ArrayDeque<String>();
    for (int i=0; i<n; i++) {
    stack.addFirst(generator.next()); // push
    }
    System.out.println("Stack: " + stack);
    System.out.print("Pop order:");
    while (stack.isEmpty() == false) {
    String head = stack.removeFirst(); // pop
    System.out.print(" " + head);
    }
    System.out.println();
    }

一个例子

接口很好很强大,如果泛泛而谈,我感觉很虚,不实在,那么就从这个小例子谈谈自己的理解吧。在机器学习算法里面,有些会涉及各种类型的距离计算,例如计算与给定点a距离最近的点b,我们可以用欧氏距离,也可以使用马氏距离,或者是向量cos角度值等等。

很容易发现,计算最近点的算法流程是基本不变的,而变化的部分是距离计算方式,那么我们可以把距离计算方式独立出来,作为一个接口。这个接口可以被实现成不同计算形式,从而做到灵活扩展,但是算法流程部分代码是不需要变更的。这样从某种程度上就做到了“对修改关闭,对扩展开放。”这一经典的做法,就是大名鼎鼎的策略模式。

初体验

下面看看具体代码(已省去逻辑细节):

  • 最近邻算法流程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public class OneNearestNeighbour {
    private List<DataPoint> dataset;
    private DistanceCalculator distanceCalculator;
    public OneNearestNeighbour(List<DataPoint> dataset, DistanceCalculator calculator) {
    this.dataset = dataset;
    distanceCalculator = calculator;
    }
    public DataPoint getNearestNeighbour(DataPoint a) {
    DataPoint result = null;
    double min = Double.MAX_VALUE;
    for (DataPoint b : dataset) {
    double distance = distanceCalculator.distance(a, b);
    if (distance < min) {
    result = b;
    min = distance;
    }
    }
    return result;
    }
    }
  • 距离计算接口

    1
    2
    3
    public interface DistanceCalculator {
    double distance(DataPoint a, DataPoint b);
    }
  • 计算欧氏距离

    1
    2
    3
    4
    5
    6
    7
    8
    public class EuclideanDistance implements DistanceCalculator {
    public double distance(DataPoint a, DataPoint b) {
    double result = 0;
    // ...
    return result;
    }
    }

这样就可以用欧氏距离计算最近邻了。

1
2
3
4
5
6
public static void main(String[] args) {
// ...
DistanceCalculator calculator = new EuclideanDistance();
OneNearestNeighbour oneNN = new OneNearestNeighbour(dataset, calculator);
oneNN.getNearestNeighbour(point);
}

进一步

现在我们厌倦了欧氏距离,想试试马氏距离的效果,所需做的就是用马氏距离实现接口DistanceCalculator,算法流程无需变更。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MahalanobisDistance implements DistanceCalculator {
public double distance(DataPoint a, DataPoint b) {
double result = 0;
// ...
return result;
}
public static void main(String[] args) {
// ...
DistanceCalculator calculator = new MahalanobisDistance();
OneNearestNeighbour oneNN = new OneNearestNeighbour(dataset, calculator);
oneNN.getNearestNeighbour(point);
}
}

再进一步

有些时候,我们无需从零开始实现接口DistanceCalculator,像上面的两个距离计算方式。例如,我们现在已经有两个点相似度的计算类PointsSimilarity

1
2
3
4
5
6
7
public class PointsSimilarity {
double similarity(DataPoint a, DataPoint b) {
double result = 0;
// ...
return result;
}
}

PointsSimilarity类借助适配模式,包装成接口DistanceCalculator的实现,就可以巧妙的融入到最近邻算法当中。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class PointsSimilarityAdapter implements DistanceCalculator {
private PointsSimilarity pointsSimilarity;
public PointsSimilarityAdapter(PointsSimilarity pointsSimilarity) {
this.pointsSimilarity = pointsSimilarity;
}
public double distance(DataPoint a, DataPoint b) {
return 1 / (1 + pointsSimilarity.similarity(a, b));
}
}

反思

其实在上面这个例子中,接口DistanceCalculator完全可以用一个抽象类来代替,可以达到一模一样的效果。但是如果类PointsSimilarity在适配的时候,需要继承另一个类,那样用抽象类就没法适配了。

接口和抽象类都可以做到不能被直接实例化,通过抽象方法来规范子类的行为,相当于所有子类都会遵守这个“契约”,所以子类可以向上泛化成父类,对调用者隐藏具体的实现。而接口与抽象类相比,最大的优势就是可以多重继承。另外,使用接口就意味着大家都要准守契约,可以做到具体实现和接口调用的解耦,可以很方便些测试用例,mock一下接口,各个模块就可以独立测试,著名的mvc模式就是这么搞的。

TIJ的作者在本章的最后给出了一个忠告:

恰当的原则应该是优先使用类而不是接口。从类开始,如果接口的必需性变得非常明确,那么就进行重构。接口是一种工具,但是它容易被滥用。

这句话我理解不深,在以后的工作学习中,多琢磨,多尝试。

多态是TIJ的第八章,记录一些琐碎的语言细节和陷阱,平时更喜欢用interface的形式来实现多态,那样感觉会更顺眼,纯属个人喜好,O(∩_∩)O哈哈~

一个简单例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package thinking.in.java.polymorphism;
import static thinking.in.java.util.Print.*;
public abstract class Cycle {
public static void echo(Cycle cycle) {
println(cycle.name() + " has " + cycle.wheels() + " wheels.");
}
public String name() {
return getClass().getSimpleName();
}
public abstract int wheels();
public static void main(String[] args) {
Cycle.echo(new Bicycle());
Cycle.echo(new Tricycle());
/* output:
Bicycle has 2 wheels.
Tricycle has 3 wheels.
*/
}
}
class Bicycle extends Cycle {
public int wheels() {
return 2;
}
}
class Tricycle extends Cycle {
public int wheels() {
return 3;
}
}

在java的世界里,父类中除了static和final方法(private方法也是final方法,这个设定是很合理的,因为private方法是无法被继承的),其它方法都可以被子类继承(或者override),这些方法运行时都是需要动态绑定到具体对象类型,相当于c++里的方法加了virtual字段。那么,为什么static、final、private方法不需要做额外的动态绑定开销呢?原因很简单,static方法是属于类的,和具体对象无关,而final和private方法是无法继承覆盖的,其对应的对象类型是固定的。一般,编译器会对final方法进行展开处理,来提高效率,就像c++里的inline函数。不过对于类的方法能不加final,就尽量不加,但是能private的,尽量private。

现在说说上面这个简单例子,Cycle.echo()方法传入的都是Cycle的子类类型,都可以upcast到Cycle类型,但是具体运行到name()和wheels()方法时候,都进行了动态绑定,执行的是子类的方法。这样做的好处是在加入新的子类,e.g. Unicycle时候,基类Cycle根本不需要做任何改变,执行echo方法,如果传入的是Unicycle的对象,就会动态绑定到新编写的方法。这样就可以做到“对修改关闭,对扩展开放了”。

Covariant Return

子类在覆盖基类的方法时,方法的签名是必须一致的,但是return的类型不一定要一致,子类返回的是父类返回类型的子类型也是可以的,因为这完全不妨碍upcast。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Grain {
public String toString() { return "Grain"; }
}
class Wheat extends Grain {
public String toString() { return "Wheat"; }
}
class Mill {
Grain process() { return new Grain(); }
}
class WheatMill extends Mill {
@Override
Wheat process() { return new Wheat(); }
}
public class CovariantReturn {
public static void main(String[] args) {
Mill m = new Mill();
Grain g = m.process();
println(g);
m = new WheatMill();
g = m.process();
println(g);
/* output:
Grain
Wheat
*/
}
}

Mill的process()和子类WheatMill的process()就是存在这个差异。

Pitfall:构造器中调用动态绑定函数

最好还是不要那样做,有可能产生匪夷所思的错误。下面看个TIJ书上的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Glyph {
void draw() { println("Glyph.draw()"); }
Glyph() {
println("Glyph() before draw()");
draw();
println("Glyph() after draw()");
}
}
class RoundGlyph extends Glyph {
private int radius = 1;
@Override
void draw() { println("RoundGlyph.draw(), radius = " + radius); }
RoundGlyph() {
println("RoundGlyph(), radius = " + radius);
}
}
public class PolyConstructors {
public static void main(String[] args) {
new RoundGlyph();
/* output:
Glyph() before draw()
RoundGlyph.draw(), radius = 0
Glyph() after draw()
RoundGlyph(), radius = 1
*/
}
}

在运行new RoundGlyph();这个语句时候,先要调用父类Glyph的构造器,但是Glyph()的draw()方法是动态绑定到RoundGlyph的对象上的,而该对象尚未构造完成,所有的实例域都被初始化为二进制0,所以打印出来的radius是0。

缘起

平时使用java,写的最多也就是mapreduce代码,除了刚开始会带来一点新鲜感,等到熟悉map、reduce这一套之后,剩下的就是机械重复。这种厌倦感重新燃起了深入学习java的欲望。正好家里有一本java编程思想,就抓起来看了第七章,略有一点小感悟,就作为读书笔记写下来。希望自己能坚持看下去、写下去。


组合和继承

感觉平时还是用组合比较多,继承基本不用(除了继承一下Mapper和Reducer,这种条条框框的代码,用继承,确实可以给使用者带来不少便利)。我觉得组合最大的好处就是给了复用代码的程序员更大的定制空间:

  1. 可以选择性的暴露原有的接口;
  2. 重新定制一下原有的接口(当然要真正达到以假乱真的效果,还是需要继承一下,Adapter模式就是这么搞的)。

另外,对于继承,父类想要暴露出来的接口,应该是public,因为二次开发想要继承的子类很有可能是出现在其他包下面。

语言细节

构造函数调用顺序

构造函数的初始化顺序,一般都是先父类,后子类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Game {
Game() {
println("Game");
}
}
class BoardGame extends Game {
BoardGame(int size) {
// The compiler auto-insert super() here.
println("BoardGame");
}
}
public class Chess extends BoardGame {
Chess() {
super(11);
println("Chess");
}
public static void main(String[] args) {
new Chess();
/* output:
Game
BoardGame
Chess
*/
}
}

在生成Chess对象,首先会调用父类BoardGame(在BoardGame的构造方法中,编译器会自动加入父类Game的默认构造方法调用),发现BoardGame还有父类Game,所以整个初始化顺序就是:
Game -> BoardGame -> Chess

类的加载顺序

类的加载发生在第一次new对象或调用static变量(方法)时。其变量的初始化顺序是先static变量,后实例变量;先父类,后子类。这个顺序是比较合理的,static变量是独立于类的实例的,所以在类加载的时候率先初始化,而实例变量等到真正需要new一个实例的时候再初始化;同样,子类是依赖于父类的,所以父类先初始化。这个过程很好验证,下面给个例子说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Insect {
Insect() {
println("Insect: constructor");
}
{
println("Insect: instance initialization");
}
static {
println("Insect: static initialization");
}
}
class Beetle extends Insect {
static int loadTimes = 0;
Beetle() {
println("Beetle: constructor");
}
static {
println("Beetle: static initialization");
loadTimes++;
}
}

  • Insect的初始化顺序
    1
    new Insect();

Insect: static initialization
Insect: instance initialization
Insect: constructor

  • Insect和Beetle的初始化顺序
    1
    new Beetle();

Insect: static initialization
Beetle: static initialization
Insect: instance initialization
Insect: constructor
Beetle: constructor

final 保留字段的用法

final修饰符,大概有三个用途,修饰变量、方法、类。

修饰变量

final修饰变量,那么变量就成了常量,初始化之后不能“修改”,对于基本类型无法修改值,对于非基本类型,无法修改引用指向的对象,但是对象的内容还是可以修改的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class FinalData {
final int n = 4;
final int[] array = new int[n];
public static void main(String[] args) {
FinalData fd = new FinalData();
// fd.n = 5; // The final field FinalData.n cannot be assigned.
// fd.array = new int[4]; // The final field FinalData.array cannot be assigned
println("array[0] = " + fd.array[0]);
fd.array[0] = 4;
println("array[0] = " + fd.array[0]);
/* output:
array[0] = 0
array[0] = 4
*/
}
}

final字段是必须初始化完成的,即便不在构造函数之前,也要在构造函数中完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class BlankFinal {
private final int i = 0; // Initialized final
private final int j; // Blank final
private final Poppet p; // Blank final reference
// Initialize all blank finals in the constructor.
public BlankFinal(int x) {
j = x;
p = new Poppet(x);
}
public static void main(String[] args) {
new BlankFinal(10);
}
}

修饰方法和类

用final修饰的方法不能被继承后override;final类则不能被继承,String类就是final类。这个平时基本没怎么用过。TIJ的作者也表示慎用final来修饰方法和类。

缘起

之前项目,有个任务需要将每天的批量任务运算结果插入到mysql数据库中,以备后续查询,运算结果条数大概在100w左右,如果一条条插入,需要几十分钟,竟然和产生数据的批量任务运行时间相当。经过优化之后,插入性能差不多有10倍的提升。在这里记录一下调研调优的整个过程,权当自我总结。^_^

mysql数据插入方法

  • 调用Statement相关方法,有单个调用和批量处理两种
  • 调用PreparedStatement相关方法,同样有两种方式
  • load方法,很好很强大

下面一一介绍这些方法,并随带样例代码加以说明。

Database Connection

为了方便说明,只是简单做了一下包装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class DbConnectionManager {
private static DbConfiguration dbConfiguration = null;
static {
dbConfiguration = DbConfiguration.instance();
try {
Class.forName(dbConfiguration.getDriver());
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public static Connection getConnection() throws SQLException {
String url = dbConfiguration.getUrl();
String username = dbConfiguration.getUsername();
String password = dbConfiguration.getPassword();
Connection connection = null;
connection = DriverManager.getConnection(url, username, password);
connection.setAutoCommit(false); // 为了提高批量插入的性能
return connection;
}
}

Statement

调用Statement相关方法,单个插入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public long runInsertViaStatement(List<DataUnit> data) {
long costTime = 0;
Connection connection = null;
Statement statement = null;
try {
connection = DbConnectionManager.getConnection();
statement = connection.createStatement();
long begin = System.currentTimeMillis();
for (DataUnit record : data) {
String sql = String.format("insert ignore into term_uv values ('%s', '%s', %s, %s)",
record.term, record.dt, record.uv, record.platform);
statement.execute(sql);
}
connection.commit();
costTime += (System.currentTimeMillis() - begin);
} catch (SQLException e) {
e.printStackTrace();
} finally {
try {
statement.close();
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return costTime;
}

StatementBatch

批量插入数据,每1000条数据执行一次executeBatch。

1
2
3
4
5
6
7
8
9
10
11
12
statement = connection.createStatement();
int count = 0;
for (DataUnit record : data) {
count += 1;
String sql = String.format("insert ignore into term_uv values('%s', '%s', %s, %s)",
record.term, record.dt, record.uv, record.platform);
statement.addBatch(sql);
if (count % 1000 == 0) {
statement.executeBatch();
}
}
statement.executeBatch();

PreparedStatement

这种方式也是一条条插入,但是不用重复为每条sql解析语法,所以相对于Statement方法,会快一些,但是在实验中,并没有差多少,可能是sql太过简单。用PreparedStatement也有缺点,就是一个statement只能为一条sql语句服务。

1
2
3
4
5
6
7
8
9
String sql = "insert ignore into term_uv (term, dt, uv, platform) values (?, ?, ?, ?)";
statement = connection.prepareStatement(sql);
for (DataUnit record : data) {
statement.setString(1, record.term);
statement.setString(2, record.dt);
statement.setString(3, record.uv);
statement.setString(4, record.platform);
statement.executeUpdate();
}

PreparedStatementBatch

这个就是上面几种方法的终极组合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
String sql = "insert ignore into term_uv (term, dt, uv, platform) values (?, ?, ?, ?)";
statement = connection.prepareStatement(sql);
int count = 0;
for (DataUnit record : data) {
count += 1;
statement.setString(1, record.term);
statement.setString(2, record.dt);
statement.setString(3, record.uv);
statement.setString(4, record.platform);
statement.addBatch();
if (count % 1000 == 0) {
statement.executeBatch();
}
}
statement.executeBatch();

Load

直接通过文件方式load到mysql中,这应该是最快的方式,实验结果也是如此。但是缺点就是要把数据写到文件中,然后load到mysql中。

1
2
3
statement = connection.createStatement();
String sql = String.format("load data local infile '%s' replace into table term_uv", filename);
statement.executeUpdate(sql);

###实验结果

数据量 Statement PreparedStatement StatementBatch PreparedStatementBatch Load
1w 273231 ms 273045 ms 274518 ms 273063 ms 1002 ms

除了load,其它方法基本差不多,当时也没有多调查,就直接采用load。

rewriteBatchedStatements

做完项目之后,google了一把,才发现原来默认情况下,jdbc对批量执行还是一条条处理。要开启批量模式,需要在建立connection的url参数中设置rewriteBatchedStatements为true。

1
jdbc:mysql://<host>:<port>/<db>?rewriteBatchedStatements=true

新的实验结果如下:

数据量 Statement PreparedStatement StatementBatch PreparedStatementBatch Load
1w 273231 ms 273045 ms 2555 ms 1181 ms 1002 ms
10w 18361 ms 7850 ms 6812 ms

P.S. 对于term字段中包含单引号的需要转义

1
term.replace("'", "''");