001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.stomp; 018 019import java.io.IOException; 020import java.security.cert.X509Certificate; 021 022import javax.jms.JMSException; 023 024import org.apache.activemq.broker.BrokerContext; 025import org.apache.activemq.command.Command; 026import org.apache.activemq.transport.Transport; 027import org.apache.activemq.transport.TransportFilter; 028import org.apache.activemq.transport.TransportListener; 029import org.apache.activemq.transport.tcp.SslTransport; 030import org.apache.activemq.util.IOExceptionSupport; 031import org.apache.activemq.wireformat.WireFormat; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * The StompTransportFilter normally sits on top of a TcpTransport that has been 037 * configured with the StompWireFormat and is used to convert STOMP commands to 038 * ActiveMQ commands. All of the conversion work is done by delegating to the 039 * ProtocolConverter. 040 * 041 * @author <a href="http://hiramchirino.com">chirino</a> 042 */ 043public class StompTransportFilter extends TransportFilter implements StompTransport { 044 private static final Logger LOG = LoggerFactory.getLogger(StompTransportFilter.class); 045 private static final Logger TRACE = LoggerFactory.getLogger(StompTransportFilter.class.getPackage().getName() + ".StompIO"); 046 private final ProtocolConverter protocolConverter; 047 private StompInactivityMonitor monitor; 048 private StompWireFormat wireFormat; 049 050 private boolean trace; 051 052 public StompTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) { 053 super(next); 054 this.protocolConverter = new ProtocolConverter(this, brokerContext); 055 056 if (wireFormat instanceof StompWireFormat) { 057 this.wireFormat = (StompWireFormat) wireFormat; 058 } 059 } 060 061 public void oneway(Object o) throws IOException { 062 try { 063 final Command command = (Command) o; 064 protocolConverter.onActiveMQCommand(command); 065 } catch (JMSException e) { 066 throw IOExceptionSupport.create(e); 067 } 068 } 069 070 public void onCommand(Object command) { 071 try { 072 if (trace) { 073 TRACE.trace("Received: \n" + command); 074 } 075 076 protocolConverter.onStompCommand((StompFrame) command); 077 } catch (IOException e) { 078 onException(e); 079 } catch (JMSException e) { 080 onException(IOExceptionSupport.create(e)); 081 } 082 } 083 084 public void sendToActiveMQ(Command command) { 085 TransportListener l = transportListener; 086 if (l != null) { 087 l.onCommand(command); 088 } 089 } 090 091 public void sendToStomp(StompFrame command) throws IOException { 092 if (trace) { 093 TRACE.trace("Sending: \n" + command); 094 } 095 Transport n = next; 096 if (n != null) { 097 n.oneway(command); 098 } 099 } 100 101 public X509Certificate[] getPeerCertificates() { 102 if (next instanceof SslTransport) { 103 X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates(); 104 if (trace && peerCerts != null) { 105 LOG.debug("Peer Identity has been verified\n"); 106 } 107 return peerCerts; 108 } 109 return null; 110 } 111 112 public boolean isTrace() { 113 return trace; 114 } 115 116 public void setTrace(boolean trace) { 117 this.trace = trace; 118 } 119 120 @Override 121 public StompInactivityMonitor getInactivityMonitor() { 122 return monitor; 123 } 124 125 public void setInactivityMonitor(StompInactivityMonitor monitor) { 126 this.monitor = monitor; 127 } 128 129 @Override 130 public StompWireFormat getWireFormat() { 131 return this.wireFormat; 132 } 133 134 public String getDefaultHeartBeat() { 135 return protocolConverter != null ? protocolConverter.getDefaultHeartBeat() : null; 136 } 137 138 public void setDefaultHeartBeat(String defaultHeartBeat) { 139 protocolConverter.setDefaultHeartBeat(defaultHeartBeat); 140 } 141 142}