Saturday 26 July 2014

Publish Consume Blob and Stream Message with JMS ActiveMQ

Few months back,i was scratching my head and i was Google-ing all day for publishing and consuming Blob data(like pdf,word,excel) through message queue. Couldn't find any solution other than some tips and special url for blob messages. So i'm writing this tutorial for publish and consume Blob & Stream messages through Message Queue.I will be using ActiveMQ for this tutorial.
Note: ActiveMQ support BlobMessage .Its specific to ActiveMQ, JMS don't have any BlobMessage wrapper,it support StreamMessage.

 Producer:

import java.io.File;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;

public class BlobMessageProducer {
private Connection connection = null;
private ActiveMQSession session = null;
private Destination destination = null;
private MessageProducer producer = null;
private File file;

  private void init(String fileName) throws Exception {
file = new File(fileName);
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/");
connection = connectionFactory.createConnection();
session = (ActiveMQSession) connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("File.Transport");
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
}

public void sendFile(String fileName) {
try {
 System.out.println("Send File Start >>");
init(fileName);
BlobMessage blobMessage = session.createBlobMessage(file);
blobMessage.setStringProperty("FILE.NAME", file.getName());
blobMessage.setLongProperty("FILE.SIZE", file.length());
producer.send(blobMessage);
  System.out.println("Send File End>>");
} catch (Exception e) {

} finally {
close();
}
}

private void close() {

try {
if (connection != null) {
connection.close();
}
logger.info("--producer close end--");
} catch (JMSException e) {

}
System.exit(0);
}

public static void main(String argv[]) {
String fileName = "/home/kuntal/practice/config-data/test.pdf";
new BlobMessageProducer().sendFile(fileName);
}

}



Consumer:

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.BlobMessage;

public class BlobMessageConsumer {
private MessageConsumer consumer;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private static Logger logger = Logger.getLogger(BlobMessageConsumer.class);
private BufferedOutputStream bos;

private void init() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("File.Transport");
consumer = session.createConsumer(destination);
}

public void receiveFile(String targetFilePath) {
try {
init();
while (true) {
Message message = consumer.receive(5000);
if (message == null) {
break;
}

if (message instanceof BlobMessage) {
byte[] buffer = new byte[2048];
int length = 0;
BlobMessage blobMessage = (BlobMessage) message;
String fileName = blobMessage
.getStringProperty("FILE.NAME");

File file = new File(targetFilePath + File.separator
+ fileName);
OutputStream os = new FileOutputStream(file);
bos = new BufferedOutputStream(os);

InputStream inputStream = blobMessage.getInputStream();
while ((length = inputStream.read(buffer)) > 0) {
bos.write(buffer, 0, length);
}
}
}
} catch (Exception e) {

} finally {
close();
}
}

private void close() {

try {
if (bos != null) {
bos.close();
}
if (connection != null) {
connection.close();
}

} catch (IOException e) {

} catch (JMSException e) {

}
System.exit(0);
}

public static void main(String[] args) {
String targetFileFolder = "/home/kuntal/practice/config-data/output";
new BlobMessageConsumer().receiveFile(targetFileFolder);
}
}

For Streaming Data , JMS provides StreamMessage,which is very handy to push and pull streaming data like (log,text file) in the Message Queue.So this is how to publish and consume streaming data into ActiveMQ.

Producer:
public class StreamProducer {
private Connection connection;
private Session session;
private Destination destination;
private MessageProducer producer;
private InputStream in;

private static Logger logger = Logger.getLogger(StreamProducer.class);

public void sendFile(String fileName) {
logger.info("--sendFile start--");
try {
init(fileName);
byte[] buffer = new byte[1024];
int c = -1;
while ((c = in.read(buffer)) > 0) {
StreamMessage smsg = session.createStreamMessage();
smsg.writeBytes(buffer, 0, c);
producer.send(smsg);
logger.info("send: " + c);
}
logger.info("--sendFile end--");
} catch (Exception e) {
logger.error("--sendFile fail--", e);
} finally {
close();
}
}

private void init(String fileName) throws Exception {
File file = new File(fileName);
in = new FileInputStream(file);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("queue1");
producer = session.createProducer(destination);
// producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
connection.start();
}

private void close() {
logger.info("--producer close start--");
try {
if (in != null) {
in.close();
}
if (connection != null) {
connection.close();
}
logger.info("--producer close end--");
} catch (IOException e) {
logger.error("--close InputStream fail--", e);
} catch (JMSException e) {
logger.error("--close connection fail--", e);
}
System.exit(0);
}

public static void main(String argv[]) {
ClassLoader loader = StreamProducer.class.getClassLoader();
//URL url = loader.getResource("test.txt");
String fileName = "/home/kuntal/practice/config-data/test.txt";
new StreamProducer().sendFile(fileName);
}

}

Consumer:
public class StreamConsumer {
private MessageConsumer consumer;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private static Logger logger = Logger.getLogger(StreamConsumer.class);
private BufferedOutputStream bos = null;

private void init(String targetFileName) throws Exception {
logger.info("--init start--");
logger.info("--targetFileName--" + targetFileName);
OutputStream out = new FileOutputStream(targetFileName);
bos = new BufferedOutputStream(out);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("queue1");
consumer = session.createConsumer(destination);
logger.info("--init end--");
}

public void receiveFile(String targetFileName) {
logger.info("--receive file start--");
try {
init(targetFileName);
byte[] buffer = new byte[2048];
while (true) {
Message msg = consumer.receive(5000);
if (msg == null) {
break;
}

if (msg instanceof StreamMessage) {
StreamMessage smsg = (StreamMessage) msg;
int c = smsg.readBytes(buffer);

String tempStr = new String(buffer, 0, c);
logger.info("Receive str: " + tempStr);
bos.write((tempStr).getBytes());
}
}
logger.info("--receive file end--");
} catch (Exception e) {
logger.error("--sendFile fail--", e);
} finally {
close();
}
}

private void close() {
logger.info("--consumer close start--");
try {
if (bos != null) {
bos.close();
}
if (bos != null) {
bos.close();
}
if (connection != null) {
connection.close();
}
logger.info("--consumer close end--");
} catch (IOException e) {
logger.error("--close OutputStream fail--", e);
} catch (JMSException e) {
logger.error("--close connection fail--", e);
}
System.exit(0);
}

public static void main(String[] args) {
new StreamConsumer().receiveFile("/home/kuntal/practice/config-data/output2.txt");
}

}

Hope this help you and saves your valuable time!!

18 comments:

  1. I am getting null when i execute below code.

    InputStream inputStream = blobMessage.getInputStream();
    System.out.println(inputStream);

    Kindly help me out.

    ReplyDelete
    Replies
    1. This comment has been removed by a blog administrator.

      Delete
  2. Hi Vikash,

    Sorry for the Late reply.

    I think you are doing some wrong imports.
    I have updated the blog with the imports as well for the BlobMessage Producer & Consumer. I have checked it and its running fine.

    ReplyDelete
    Replies
    1. This comment has been removed by the author.

      Delete
  3. Hi,

    I've tried both of the BlobMessage and the StreamMessage samples. The StreamMessage works fine for simple .txt files, but when i try to send .pdf files, it creates the file at the consumer side but the content of the file is never produced correctly, it always remains empty. Doesn't the StreamMessage support .pdf files or am i doing something wrong?

    And for the BlobMessage sample, i couldn't achieve sending any type of file (neither .txt nor .pdf). Here's my activemq.xml:


    -

    -
    -


    -
    -
    -




    -







    I use activemq v5.8.0. And didn't make any modifications in jetty.xml that comes within the activemq distribution.

    Any help will be appreciated, thank you...

    --
    Ekip

    ReplyDelete
  4. This comment has been removed by the author.

    ReplyDelete
  5. Thanks Kuntal Ganguly The shared example is very helpful for me!!

    ReplyDelete
  6. Code is working fine..thank you

    ReplyDelete
  7. yes, it's done! after enabling RESTful file access for blob messages.

    Thank you ! :)

    ReplyDelete
  8. Hi All,

    I am getting error by calling blobMessage.getInputStream() function of Consumer class.
    When I sent blob message file on Queue through Producer and trying to receive blob message through Consumer but receiving below error

    java.io.FileNotFoundException: http://localhost:8161/fileserver/ID:FCE8043-5027-1468315878677-0:0:1:1:2

    Please guide me on above.

    Thanks in advance :)

    ReplyDelete
  9. All, I am still getting the below error. Any help is appreciated.
    ***********
    javax.jms.JMSException: PUT was not successful: 401 Unauthorized
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:72)
    at org.apache.activemq.command.ActiveMQBlobMessage.onSend(ActiveMQBlobMessage.java:177)

    ReplyDelete
  10. Its getting failed at
    ---->> producer.send(blobMessage);

    ReplyDelete
  11. Hola, me ayudó tu código, pero el StreamMessage no me permitió enviar archivos .pdf solo .txt. Hice un cambio a BytesMessage y añadí unas líneas más y me funcionó.

    ReplyDelete
  12. PRODUCER
    ________

    import java.io.BufferedOutputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.OutputStream;

    import javax.jms.BytesMessage;
    import javax.jms.Connection;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;

    import org.apache.activemq.ActiveMQConnectionFactory;

    public class LargeBytesMessageConsumer {
    private MessageConsumer consumer;
    private Connection connection = null;
    private Session session = null;
    private Destination destination = null;
    private String brokerUrl = "tcp://172.20.227.158:61616";
    private String user = "UserBroker2";
    private String pass = "123456";
    private String colaDestino = "queue1";

    private BufferedOutputStream bos = null;

    private void init(String targetFileName) throws Exception {

    System.out.println("--init start--");

    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user,pass,brokerUrl);
    connection = factory.createConnection();
    connection.start();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    destination = session.createQueue(colaDestino);
    consumer = session.createConsumer(destination);

    System.out.println("--init end--");

    }

    public void receiveFile(String targetFileName) {
    System.out.println("--receive file start--");

    try {
    init(targetFileName);

    OutputStream out = new FileOutputStream(targetFileName);
    bos = new BufferedOutputStream(out);

    while (true) {
    Message msg = consumer.receive(9000);
    if (msg == null) {
    break;
    }

    if (msg instanceof BytesMessage) {
    BytesMessage smsg = (BytesMessage) msg;

    int bodyLength = (int) smsg.getBodyLength();
    byte[] buffer = new byte[bodyLength];
    smsg.readBytes(buffer,bodyLength);
    bos.write(buffer);
    }

    }

    System.out.println("--receive file end--");
    } catch (Exception e) {
    System.out.println("--sendFile fail--"+ e);
    } finally {
    close();
    }
    }

    private void close() {

    System.out.println("--consumer close start--");
    try {
    if (consumer != null) {
    consumer.close();
    }
    if (bos != null) {
    bos.close();
    }
    if (session != null) {
    session.close();
    }
    if (connection != null) {
    connection.close();
    }

    System.out.println("--consumer close end--");
    } catch (IOException e) {
    System.out.println("--close OutputStream fail--"+ e);
    } catch (JMSException e) {
    System.out.println("--close connection fail--"+ e);
    }
    // System.exit(0);
    }

    public static void main(String[] args) {
    String filename="/FilePJ/output/archivo.pdf";
    // String filename="/FilePJ/output/demo.txt";
    //String filename="/FilePJ/output/demo.docx";
    new LargeBytesMessageConsumer().receiveFile(filename);
    }

    }

    ReplyDelete
  13. This comment has been removed by the author.

    ReplyDelete
  14. CONSUMER
    -----------

    import java.io.BufferedOutputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.OutputStream;

    import javax.jms.BytesMessage;
    import javax.jms.Connection;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;

    import org.apache.activemq.ActiveMQConnectionFactory;

    public class LargeBytesMessageConsumer {
    private MessageConsumer consumer;
    private Connection connection = null;
    private Session session = null;
    private Destination destination = null;
    private String brokerUrl = "tcp://172.20.227.158:61616";
    private String user = "UserBroker2";
    private String pass = "123456";
    private String colaDestino = "queue1";

    private BufferedOutputStream bos = null;

    private void init(String targetFileName) throws Exception {

    System.out.println("--init start--");

    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user,pass,brokerUrl);
    connection = factory.createConnection();
    connection.start();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    destination = session.createQueue(colaDestino);
    consumer = session.createConsumer(destination);

    System.out.println("--init end--");

    }

    public void receiveFile(String targetFileName) {
    System.out.println("--receive file start--");

    try {
    init(targetFileName);

    OutputStream out = new FileOutputStream(targetFileName);
    bos = new BufferedOutputStream(out);

    while (true) {
    Message msg = consumer.receive(9000);
    if (msg == null) {
    break;
    }

    if (msg instanceof BytesMessage) {
    BytesMessage smsg = (BytesMessage) msg;

    int bodyLength = (int) smsg.getBodyLength();
    byte[] buffer = new byte[bodyLength];
    smsg.readBytes(buffer,bodyLength);
    bos.write(buffer);
    }

    }

    System.out.println("--receive file end--");
    } catch (Exception e) {
    System.out.println("--sendFile fail--"+ e);
    } finally {
    close();
    }
    }

    private void close() {

    System.out.println("--consumer close start--");
    try {
    if (consumer != null) {
    consumer.close();
    }
    if (bos != null) {
    bos.close();
    }
    if (session != null) {
    session.close();
    }
    if (connection != null) {
    connection.close();
    }

    System.out.println("--consumer close end--");
    } catch (IOException e) {
    System.out.println("--close OutputStream fail--"+ e);
    } catch (JMSException e) {
    System.out.println("--close connection fail--"+ e);
    }
    // System.exit(0);
    }

    public static void main(String[] args) {
    String filename="/FilePJ/output/archivo.pdf";
    // String filename="/FilePJ/output/demo.txt";
    //String filename="/FilePJ/output/demo.docx";
    new LargeBytesMessageConsumer().receiveFile(filename);
    }

    }

    ReplyDelete
  15. can you plz let me know how to do the same with JMS. I have a similar requirement but I am supposed to do use JMS only.

    ReplyDelete