diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..dab5030 --- /dev/null +++ b/Makefile @@ -0,0 +1,9 @@ +MYSQL_DEV_DIR=/usr/include/mysql +APR_DEV_DIR=/usr/include/apr-1.0/ +MYSQL_LIB_DIR=/usr/lib/mysql/plugin +APR_LIB_DIR=/usr/lib/x86_64-linux-gnu +#APR_LIB_DIR=/usr/lib/i386-linux-gnu + +install: + gcc -Wall -O2 -I$(APR_DEV_DIR) -I$(MYSQL_DEV_DIR) lib_mysqludf_stomp.c $(APR_LIB_DIR)/libapr-1.so -shared -o $(MYSQL_LIB_DIR)/lib_mysqludf_stomp.so -fPIC + diff --git a/README b/README deleted file mode 100644 index 7689df2..0000000 --- a/README +++ /dev/null @@ -1,33 +0,0 @@ -lib_mysqludf_stomp - a library to send STOMP messages -Copyright 2005 LogicBlaze Inc. -Copyright (C) 2011 Dmitry Demianov aka barlone - -this library use part of libstomp code - -web of STOMP project: http://stomp.codehaus.org/ -email: barlone@yandex.ru - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -implied. -See the License for the specific language governing permissions and -limitations under the License. - -Compile with (adapt the include and lib path to your environment): -> gcc -Wall -O2 -I/usr/local/include/apr-1 -> -I/usr/local/include/mysql \ - lib_mysqludf_stomp.c /usr/local/lib/libapr-1.so -shared \ - -o lib_mysqludf_stomp.so -> strip ./lib_mysqludf_stomp.so - -Add the functions to MySQL with: -mysql> CREATE FUNCTION stompsend RETURNS STRING SONAME "lib_mysqludf_stomp.so"; -mysql> CREATE FUNCTION stompsend1 RETURNS STRING SONAME "lib_mysqludf_stomp.so"; -mysql> CREATE FUNCTION stompsend2 RETURNS STRING SONAME "lib_mysqludf_stomp.so"; diff --git a/README.md b/README.md new file mode 100644 index 0000000..6995fc0 --- /dev/null +++ b/README.md @@ -0,0 +1,82 @@ +## lib_mysqludf_stomp + +A MySQL UDF library to send STOMP messages to a message broker. +Supports authentication using the stompsend2 function (see details below). + +## Instructions for Ubuntu + +### Install Dependencies +In order do compile the plugin, you need to install Apache Portable Runtime (APR) and MySQL client development packages. + +`sudo apt-get install gcc libapr1 libapr1-dev libmysqlclient-dev` + +### Compile & Install +To install the UDF, just run the provided install script which will compile and install the library. +The MySQL root password will be requested for installing. + +`sudo ./install.sh` + +#### If you get errors of missing libraries, edit the Makefile and make sure the paths are correct for your system. + +### Using the functions + +The plugin will provide you with 3 new functions you can use in your queries. +All of them take the same first 3 parameters (Hostname, Topic, Message) and the others are headers. +For authentication you must use `stompsend2`. +All parameters are strings : + +- stompsend(Hostname, Topic, Message); +- stompsend1(Hostname, Topic, Message, HeaderName, HeaderValue); +- stompsend2(Hostname, Topic, Message, Header1Name, Header1Value, Header2Name, Header2Value); + +### Example +To send the message "Hello broker" to the "Welcome" topic on server "127.0.0.1", just use : + +`SELECT stompsend("127.0.0.1","Welcome", "Hello broker");` + +If everything went well, you should get and "OK" response, else you will get a NULL: + +``` ++--------------------------------------------------+ +| stompsend("127.0.0.1","Welcome", "Hello broker") | ++--------------------------------------------------+ +| OK | ++--------------------------------------------------+ +1 row in set (0.00 sec) +``` + +If you need to authenticate, you can use the `stompsend2` function with the "login" and "passcode" as headers. +This function was modified in this fork to allow authentication by sending the headers on the CONNECT frame. + +`SELECT stompsend2("127.0.0.1","Welcome", "Hello broker", "login","guest","passcode","mypass");` + +If the credentials are invalid, you should receive the body of the message sent by the broker (ex: "Access refused for client 'guest'."). + +## Credits + +Copyright 2005 LogicBlaze Inc. + +Copyright (C) 2011 Dmitry Demianov aka barlone + +this library use part of libstomp code + +web of STOMP project: http://stomp.codehaus.org + +email: barlone@yandex.ru + +Authentication support on stompsend2 added by hugorosario + +## Licence + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/install.sh b/install.sh new file mode 100755 index 0000000..0c7a248 --- /dev/null +++ b/install.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +echo "Compiling the MySQL UDF" +make + +if test $? -ne 0; then + echo "ERROR: You need libmysqlclient and Apache Portable Runtime development software installed " + echo "to be able to compile this UDF, on Debian/Ubuntu just run:" + echo "apt-get install libapr1 libapr1-dev libmysqlclient-dev" + exit 1 +else + echo "MySQL UDF compiled successfully" +fi + +echo -e "\nPlease provide your MySQL root password to install the UDF..." + +mysql -u root -p mysql < lib_mysqludf_stomp.sql + +if test $? -ne 0; then + echo "ERROR: unable to install the UDF" + exit 1 +else + echo "MySQL UDF installed successfully" +fi diff --git a/lib_mysqludf_stomp.c b/lib_mysqludf_stomp.c index c8a6992..61c7657 100644 --- a/lib_mysqludf_stomp.c +++ b/lib_mysqludf_stomp.c @@ -1,613 +1,867 @@ -/* - lib_mysqludf_stomp - a library to send STOMP messages - Copyright 2005 LogicBlaze Inc. - Copyright (C) 2011 Dmitry Demianov aka barlone - - this library use part of libstomp code - - web of STOMP project: http://stomp.codehaus.org/ - email: barlone@yandex.ru - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - Compile with (adapt the include and lib path to your environment): - > gcc -Wall -O2 -I/usr/local/include/apr-1 -I/usr/local/include/mysql \ - lib_mysqludf_stomp.c /usr/local/lib/libapr-1.so -shared \ - -o lib_mysqludf_stomp.so - > strip ./lib_mysqludf_stomp.so - - Add the functions to MySQL with: - mysql> CREATE FUNCTION stompsend RETURNS STRING SONAME "lib_mysqludf_stomp.so"; - mysql> CREATE FUNCTION stompsend1 RETURNS STRING SONAME "lib_mysqludf_stomp.so"; - mysql> CREATE FUNCTION stompsend2 RETURNS STRING SONAME "lib_mysqludf_stomp.so"; -*/ - -#if defined(_WIN32) || defined(_WIN64) || defined(__WIN32__) || defined(WIN32) -#define DLLEXP __declspec(dllexport) -#else -#define DLLEXP -#endif - -#ifdef STANDARD -/* STANDARD is defined, don't use any mysql functions */ -#include -#include - -#ifdef __WIN__ -typedef unsigned __int64 ulonglong; /* Microsofts 64 bit types */ -typedef __int64 longlong; -#else -typedef unsigned long long ulonglong; -typedef long long longlong; -#endif /*__WIN__*/ - -#else -#include -#include -#include -#endif -#include -#include -#include "apr.h" -#include "apr_strings.h" -#include "apr_general.h" -#include "apr_network_io.h" -#include "apr_hash.h" - -#ifdef HAVE_DLOPEN - -#define LIBVERSION "lib_mysqludf_stomp version 0.2.0" - -/****************************************************************************** -** function declarations -******************************************************************************/ -#ifdef __cplusplus -extern "C" { -#endif - -DLLEXP -my_bool stompsend_init(UDF_INIT *initid, UDF_ARGS *args, char *message); -DLLEXP -void stompsend_deinit(UDF_INIT *initid); -DLLEXP -char *stompsend(UDF_INIT *initid, UDF_ARGS *args, char *result, - unsigned long *res_length, char *null_value, char *error); - -DLLEXP -my_bool stompsend1_init(UDF_INIT *initid, UDF_ARGS *args, char *message); -DLLEXP -void stompsend1_deinit(UDF_INIT *initid); -DLLEXP -char *stompsend1(UDF_INIT *initid, UDF_ARGS *args, char *result, - unsigned long *res_length, char *null_value, char *error); - -DLLEXP -my_bool stompsend2_init(UDF_INIT *initid, UDF_ARGS *args, char *message); -DLLEXP -void stompsend2_deinit(UDF_INIT *initid); -DLLEXP -char *stompsend2(UDF_INIT *initid, UDF_ARGS *args, char *result, - unsigned long *res_length, char *null_value, char *error); - -typedef struct stomp_connection { - apr_socket_t *socket; - apr_sockaddr_t *local_sa; - char *local_ip; - apr_sockaddr_t *remote_sa; - char *remote_ip; -} stomp_connection; - -typedef struct stomp_frame { - char *command; - apr_hash_t *headers; - char *body; - int body_length; -} stomp_frame; - -static apr_pool_t *pool; - -#ifdef __cplusplus -} -#endif -// ------------------------------------------------------------------------------ - -// STOMP functions - -/****************************************************************************** - * - * Used to establish a connection - * - ********************************************************************************/ -APR_DECLARE(apr_status_t) stomp_connect(stomp_connection **connection_ref, const char *hostname, int port, apr_pool_t *pool) -{ -apr_status_t rc; -int socket_family; -stomp_connection *connection=NULL; - - // - // Allocate the connection and a memory pool for the connection. - // - connection = apr_pcalloc(pool, sizeof(stomp_connection)); - if( connection == NULL ) - return APR_ENOMEM; - -#define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { return rc; } - - // Look up the remote address - rc = apr_sockaddr_info_get(&connection->remote_sa, hostname, APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool); - CHECK_SUCCESS; - - // Create the socket. - socket_family = connection->remote_sa->sa.sin.sin_family; - rc = apr_socket_create(&connection->socket, socket_family, SOCK_STREAM, APR_PROTO_TCP, pool); - CHECK_SUCCESS; - - // Set socket options. - rc = apr_socket_opt_set(connection->socket, APR_SO_NONBLOCK, 1); - CHECK_SUCCESS; - rc = apr_socket_timeout_set(connection->socket, 1 * APR_USEC_PER_SEC); - CHECK_SUCCESS; - - // Try connect - rc = apr_socket_connect(connection->socket, connection->remote_sa); - CHECK_SUCCESS; - - // Get the Socket Info - rc = apr_socket_addr_get(&connection->remote_sa, APR_REMOTE, connection->socket); - CHECK_SUCCESS; - rc = apr_sockaddr_ip_get(&connection->remote_ip, connection->remote_sa); - CHECK_SUCCESS; - rc = apr_socket_addr_get(&connection->local_sa, APR_LOCAL, connection->socket); - CHECK_SUCCESS; - rc = apr_sockaddr_ip_get(&connection->local_ip, connection->local_sa); - CHECK_SUCCESS; - - -#undef CHECK_SUCCESS - - *connection_ref = connection; - return rc; -} // stomp_connect - -APR_DECLARE(apr_status_t) stomp_disconnect(stomp_connection **connection_ref) -{ -apr_status_t result, rc; -stomp_connection *connection = *connection_ref; - - if( connection_ref == NULL || *connection_ref==NULL ) - return APR_EGENERAL; - - result = APR_SUCCESS; - rc = apr_socket_shutdown(connection->socket, APR_SHUTDOWN_WRITE); - if( result!=APR_SUCCESS ) - result = rc; - - if( connection->socket != NULL ) { - rc = apr_socket_close(connection->socket); - if( result!=APR_SUCCESS ) - result = rc; - connection->socket=NULL; - } - - *connection_ref=NULL; - return rc; -} // stomp_disconnect - - -/******************************************************************************** - * - * Wrappers around the apr_socket_send and apr_socket_recv calls so that they - * read/write their buffers fully. - * - ********************************************************************************/ -APR_DECLARE(apr_status_t) stomp_write_buffer(stomp_connection *connection, const char *data, apr_size_t size) -{ -apr_size_t remaining = size; - - size=0; - while( remaining>0 ) { - apr_size_t length = remaining; - apr_status_t rc = apr_socket_send(connection->socket, data, &length); - data+=length; - remaining -= length; - // size += length; - if( rc != APR_SUCCESS ) - return rc; - - } - return APR_SUCCESS; -} // stomp_write_buffer - - -/******************************************************************************** - * - * Handles reading and writing stomp_frames to and from the connection - * - ********************************************************************************/ -APR_DECLARE(apr_status_t) stomp_write(stomp_connection *connection, stomp_frame *frame, apr_pool_t* pool) { -apr_status_t rc; - -#define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { return rc; } - // Write the command. - rc = stomp_write_buffer(connection, frame->command, strlen(frame->command)); - CHECK_SUCCESS; - rc = stomp_write_buffer(connection, "\n", 1); - CHECK_SUCCESS; - - // Write the headers - if( frame->headers != NULL ) { - - apr_hash_index_t *i; - const void *key; - void *value; - for (i = apr_hash_first(NULL, frame->headers); i; i = apr_hash_next(i)) { - apr_hash_this(i, &key, NULL, &value); - - rc = stomp_write_buffer(connection, key, strlen(key)); - CHECK_SUCCESS; - rc = stomp_write_buffer(connection, ":", 1); - CHECK_SUCCESS; - rc = stomp_write_buffer(connection, value, strlen(value)); - CHECK_SUCCESS; - rc = stomp_write_buffer(connection, "\n", 1); - CHECK_SUCCESS; - } - - if(frame->body_length >= 0) { - apr_pool_t *length_pool; - char *length_string; - - apr_pool_create(&length_pool, pool); - rc = stomp_write_buffer(connection, "content-length:", 15); - CHECK_SUCCESS; - - length_string = apr_itoa(length_pool, frame->body_length); - rc = stomp_write_buffer(connection, length_string, strlen(length_string)); - CHECK_SUCCESS; - rc = stomp_write_buffer(connection, "\n", 1); - CHECK_SUCCESS; - - apr_pool_destroy(length_pool); - } - } - rc = stomp_write_buffer(connection, "\n", 1); - CHECK_SUCCESS; - - // Write the body. - if( frame->body != NULL ) { - int body_length = frame->body_length; - if(body_length < 0) - body_length = strlen(frame->body); - rc = stomp_write_buffer(connection, frame->body, body_length); - CHECK_SUCCESS; - } - rc = stomp_write_buffer(connection, "\0\n", 2); - CHECK_SUCCESS; - -#undef CHECK_SUCCESS - - return APR_SUCCESS; -} // stomp_write - - -my_bool stompsend_init(UDF_INIT *initid, UDF_ARGS *args, char *message) -{ - - /* make sure user has provided exactly three string arguments */ - if (args->arg_count != 3 || (args->arg_type[0] != STRING_RESULT) - || (args->arg_type[1] != STRING_RESULT) - || (args->arg_type[2] != STRING_RESULT)){ - strcpy(message, "stompsend requires 3 string arguments"); - return 1; - } - - if ((args->lengths[0] == 0) || (args->lengths[1] == 0) || (args->lengths[2] == 0)){ - strcpy(message, "stompsend arguments can not be empty"); - return 1; - } - - // init APR - if (apr_initialize() != APR_SUCCESS) { - strcpy(message, "stompsend could not initialize APR"); - return 2; - } - - if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { - strcpy(message, "stompsend could not allocate APR pool"); - return 3; - } - - initid->maybe_null=0; - - return 0; -} - -/****************************************************************************** -** purpose: deallocate memory allocated by str_translate_init(); this func -** is called once for each query which invokes str_translate(), -** it is called after all of the calls to str_translate() are done -** receives: pointer to UDF_INIT struct (the same which was used by -** str_translate_init() and str_translate()) -** returns: nothing -******************************************************************************/ -void stompsend_deinit(UDF_INIT *initid __attribute__((unused))) -{ - apr_pool_destroy(pool); -} -/******************************************************************************/ - -char *stompsend(UDF_INIT *initid, UDF_ARGS *args, - char *result, unsigned long *res_length, - char *null_value, char *error) -{ -stomp_connection *connection; -stomp_frame frame; -char *host = args->args[0]; -char *topic = args->args[1]; -char *message = args->args[2]; - - if (stomp_connect( &connection, host, 61613, pool) != APR_SUCCESS) { - strcpy(error, "stompsend could not connect to broker"); - *null_value = 1; - return NULL; - } - - frame.command = "CONNECT"; - frame.headers = apr_hash_make(pool); - frame.body = NULL; - frame.body_length = -1; - if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { - strcpy(error, "stompsend could not send CONNECT frame"); - *null_value = 1; - return NULL; - } - - frame.command = "SEND"; - frame.headers = apr_hash_make(pool); - apr_hash_set(frame.headers, "destination", APR_HASH_KEY_STRING, topic); - frame.body_length = -1; - frame.body = message; - if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { - strcpy(error, "stompsend could not send SEND frame"); - *null_value = 1; - return NULL; - } - - frame.command = "DISCONNECT"; - frame.headers = NULL; - frame.body_length = -1; - frame.body = NULL; - stomp_write(connection, &frame, pool); // ignore errors - - stomp_disconnect(&connection); - - *res_length = 2; - strcpy(result, "OK"); - - return result; -} // stompsend - - -my_bool stompsend1_init(UDF_INIT *initid, UDF_ARGS *args, char *message) -{ - - /* make sure user has provided exactly three string arguments */ - if (args->arg_count != 5 || (args->arg_type[0] != STRING_RESULT) - || (args->arg_type[1] != STRING_RESULT) - || (args->arg_type[2] != STRING_RESULT) - || (args->arg_type[3] != STRING_RESULT) - || (args->arg_type[4] != STRING_RESULT)){ - strcpy(message, "stompsend1 requires 5 string arguments"); - return 1; - } - - if ((args->lengths[0] == 0) || (args->lengths[1] == 0) || (args->lengths[2] == 0) || - (args->lengths[3] == 0) || (args->lengths[4] == 0)){ - strcpy(message, "stompsend1 arguments can not be empty"); - return 1; - } - - // init APR - if (apr_initialize() != APR_SUCCESS) { - strcpy(message, "stompsend1 could not initialize APR"); - return 2; - } - - if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { - strcpy(message, "stompsend1 could not allocate APR pool"); - return 3; - } - - initid->maybe_null=0; - - return 0; -} // stompsend1_init - -/****************************************************************************** -** purpose: deallocate memory allocated by str_translate_init(); this func -** is called once for each query which invokes str_translate(), -** it is called after all of the calls to str_translate() are done -** receives: pointer to UDF_INIT struct (the same which was used by -** str_translate_init() and str_translate()) -** returns: nothing -******************************************************************************/ -void stompsend1_deinit(UDF_INIT *initid __attribute__((unused))) -{ - apr_pool_destroy(pool); -} // stompsend1_deinit -/******************************************************************************/ - -char *stompsend1(UDF_INIT *initid, UDF_ARGS *args, - char *result, unsigned long *res_length, - char *null_value, char *error) -{ -stomp_connection *connection; -stomp_frame frame; -char *host = args->args[0]; -char *topic = args->args[1]; -char *message = args->args[2]; -char *hdr1name = args->args[3]; -char *hdr1val = args->args[4]; - - if (stomp_connect( &connection, host, 61613, pool) != APR_SUCCESS) { - strcpy(error, "stompsend1 could not connect to broker"); - *null_value = 1; - return NULL; - } - - frame.command = "CONNECT"; - frame.headers = apr_hash_make(pool); - frame.body = NULL; - frame.body_length = -1; - if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { - strcpy(error, "stompsend1 could not send CONNECT frame"); - *null_value = 1; - return NULL; - } - - frame.command = "SEND"; - frame.headers = apr_hash_make(pool); - apr_hash_set(frame.headers, "destination", APR_HASH_KEY_STRING, topic); - apr_hash_set(frame.headers, hdr1name, APR_HASH_KEY_STRING, hdr1val); - frame.body_length = -1; - frame.body = message; - if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { - strcpy(error, "stompsend1 could not send SEND frame"); - *null_value = 1; - return NULL; - } - - frame.command = "DISCONNECT"; - frame.headers = NULL; - frame.body_length = -1; - frame.body = NULL; - stomp_write(connection, &frame, pool); // ignore errors - - stomp_disconnect(&connection); - - *res_length = 2; - strcpy(result, "OK"); - - return result; -} // stompsend1 - -my_bool stompsend2_init(UDF_INIT *initid, UDF_ARGS *args, char *message) -{ - - /* make sure user has provided exactly three string arguments */ - if (args->arg_count != 7 || (args->arg_type[0] != STRING_RESULT) - || (args->arg_type[1] != STRING_RESULT) - || (args->arg_type[2] != STRING_RESULT) - || (args->arg_type[3] != STRING_RESULT) - || (args->arg_type[4] != STRING_RESULT) - || (args->arg_type[5] != STRING_RESULT) - || (args->arg_type[6] != STRING_RESULT)){ - strcpy(message, "stompsend2 requires 7 string arguments"); - return 1; - } - - if ((args->lengths[0] == 0) || (args->lengths[1] == 0) || (args->lengths[2] == 0) || - (args->lengths[3] == 0) || (args->lengths[4] == 0) || - (args->lengths[5] == 0) || (args->lengths[6] == 0)){ - strcpy(message, "stompsend2 arguments can not be empty"); - return 1; - } - - // init APR - if (apr_initialize() != APR_SUCCESS) { - strcpy(message, "stompsend2 could not initialize APR"); - return 2; - } - - if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { - strcpy(message, "stompsend2 could not allocate APR pool"); - return 3; - } - - initid->maybe_null=0; - - return 0; -} // stompsend1_init - -/****************************************************************************** -** purpose: deallocate memory allocated by str_translate_init(); this func -** is called once for each query which invokes str_translate(), -** it is called after all of the calls to str_translate() are done -** receives: pointer to UDF_INIT struct (the same which was used by -** str_translate_init() and str_translate()) -** returns: nothing -******************************************************************************/ -void stompsend2_deinit(UDF_INIT *initid __attribute__((unused))) -{ - apr_pool_destroy(pool); -} // stompsend2_deinit -/******************************************************************************/ - -char *stompsend2(UDF_INIT *initid, UDF_ARGS *args, - char *result, unsigned long *res_length, - char *null_value, char *error) -{ -stomp_connection *connection; -stomp_frame frame; -char *host = args->args[0]; -char *topic = args->args[1]; -char *message = args->args[2]; -char *hdr1name = args->args[3]; -char *hdr1val = args->args[4]; -char *hdr2name = args->args[5]; -char *hdr2val = args->args[6]; - - if (stomp_connect( &connection, host, 61613, pool) != APR_SUCCESS) { - strcpy(error, "stompsend2 could not connect to broker"); - *null_value = 1; - return NULL; - } - - frame.command = "CONNECT"; - frame.headers = apr_hash_make(pool); - frame.body = NULL; - frame.body_length = -1; - if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { - strcpy(error, "stompsend2 could not send CONNECT frame"); - *null_value = 1; - return NULL; - } - - frame.command = "SEND"; - frame.headers = apr_hash_make(pool); - apr_hash_set(frame.headers, "destination", APR_HASH_KEY_STRING, topic); - apr_hash_set(frame.headers, hdr1name, APR_HASH_KEY_STRING, hdr1val); - apr_hash_set(frame.headers, hdr2name, APR_HASH_KEY_STRING, hdr2val); - frame.body_length = -1; - frame.body = message; - if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { - strcpy(error, "stompsend2 could not send SEND frame"); - *null_value = 1; - return NULL; - } - - frame.command = "DISCONNECT"; - frame.headers = NULL; - frame.body_length = -1; - frame.body = NULL; - stomp_write(connection, &frame, pool); // ignore errors - - stomp_disconnect(&connection); - - *res_length = 2; - strcpy(result, "OK"); - - return result; -} // stompsend2 - -#endif /* HAVE_DLOPEN */ +/* + lib_mysqludf_stomp - a library to send STOMP messages + Copyright 2005 LogicBlaze Inc. + Copyright (C) 2011 Dmitry Demianov aka barlone + + Authentication support on stompsend2 added by hugorosario + + this library use part of libstomp code + + web of STOMP project: http://stomp.codehaus.org/ + email: barlone@yandex.ru + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#if defined(_WIN32) || defined(_WIN64) || defined(__WIN32__) || defined(WIN32) +#define DLLEXP __declspec(dllexport) +#else +#define DLLEXP +#endif + +#ifdef STANDARD +/* STANDARD is defined, don't use any mysql functions */ +#include +#include + +#ifdef __WIN__ +typedef unsigned __int64 ulonglong; /* Microsofts 64 bit types */ +typedef __int64 longlong; +#else +typedef unsigned long long ulonglong; +typedef long long longlong; +#endif /*__WIN__*/ + +#else +#include +#include +#include +#endif +#include +#include +#include "apr.h" +#include "apr_strings.h" +#include "apr_general.h" +#include "apr_network_io.h" +#include "apr_hash.h" + +#ifdef HAVE_DLOPEN + +#define LIBVERSION "lib_mysqludf_stomp version 0.2.1" +#define BUFSIZE 4096 + +/****************************************************************************** +** function declarations +******************************************************************************/ +#ifdef __cplusplus +extern "C" { +#endif + +DLLEXP +my_bool stompsend_init(UDF_INIT *initid, UDF_ARGS *args, char *message); +DLLEXP +void stompsend_deinit(UDF_INIT *initid); +DLLEXP +char *stompsend(UDF_INIT *initid, UDF_ARGS *args, char *result, + unsigned long *res_length, char *null_value, char *error); + +DLLEXP +my_bool stompsend1_init(UDF_INIT *initid, UDF_ARGS *args, char *message); +DLLEXP +void stompsend1_deinit(UDF_INIT *initid); +DLLEXP +char *stompsend1(UDF_INIT *initid, UDF_ARGS *args, char *result, + unsigned long *res_length, char *null_value, char *error); + +DLLEXP +my_bool stompsend2_init(UDF_INIT *initid, UDF_ARGS *args, char *message); +DLLEXP +void stompsend2_deinit(UDF_INIT *initid); +DLLEXP +char *stompsend2(UDF_INIT *initid, UDF_ARGS *args, char *result, + unsigned long *res_length, char *null_value, char *error); + +typedef struct stomp_connection { + apr_socket_t *socket; + apr_sockaddr_t *local_sa; + char *local_ip; + apr_sockaddr_t *remote_sa; + char *remote_ip; +} stomp_connection; + +typedef struct stomp_frame { + char *command; + apr_hash_t *headers; + char *body; + int body_length; +} stomp_frame; + +typedef struct data_block_list { + char data[1024]; + struct data_block_list *next; +} data_block_list; + +static apr_pool_t *pool; + +#ifdef __cplusplus +} +#endif + +// ------------------------------------------------------------------------------ +// Helper functions +// ------------------------------------------------------------------------------ + +int prefix(const char *pre, const char *str){ + return strncmp(pre, str, strlen(pre)); +} + +// ------------------------------------------------------------------------------ +// STOMP functions +// ------------------------------------------------------------------------------ + + +/****************************************************************************** + * + * Used to establish a connection + * + ********************************************************************************/ +APR_DECLARE(apr_status_t) stomp_connect(stomp_connection **connection_ref, const char *hostname, int port, apr_pool_t *pool) +{ +apr_status_t rc; +int socket_family; +stomp_connection *connection=NULL; + + // + // Allocate the connection and a memory pool for the connection. + // + connection = apr_pcalloc(pool, sizeof(stomp_connection)); + if( connection == NULL ) + return APR_ENOMEM; + +#define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { return rc; } + + // Look up the remote address + rc = apr_sockaddr_info_get(&connection->remote_sa, hostname, APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool); + CHECK_SUCCESS; + + // Create the socket. + socket_family = connection->remote_sa->sa.sin.sin_family; + rc = apr_socket_create(&connection->socket, socket_family, SOCK_STREAM, APR_PROTO_TCP, pool); + CHECK_SUCCESS; + + // Set socket options. + rc = apr_socket_opt_set(connection->socket, APR_SO_NONBLOCK, 1); + CHECK_SUCCESS; + rc = apr_socket_timeout_set(connection->socket, 1 * APR_USEC_PER_SEC); + CHECK_SUCCESS; + + // Try connect + rc = apr_socket_connect(connection->socket, connection->remote_sa); + CHECK_SUCCESS; + + // Get the Socket Info + rc = apr_socket_addr_get(&connection->remote_sa, APR_REMOTE, connection->socket); + CHECK_SUCCESS; + rc = apr_sockaddr_ip_get(&connection->remote_ip, connection->remote_sa); + CHECK_SUCCESS; + rc = apr_socket_addr_get(&connection->local_sa, APR_LOCAL, connection->socket); + CHECK_SUCCESS; + rc = apr_sockaddr_ip_get(&connection->local_ip, connection->local_sa); + CHECK_SUCCESS; + + +#undef CHECK_SUCCESS + + *connection_ref = connection; + return rc; +} // stomp_connect + +APR_DECLARE(apr_status_t) stomp_disconnect(stomp_connection **connection_ref) +{ +apr_status_t result, rc; +stomp_connection *connection = *connection_ref; + + if( connection_ref == NULL || *connection_ref==NULL ) + return APR_EGENERAL; + + result = APR_SUCCESS; + rc = apr_socket_shutdown(connection->socket, APR_SHUTDOWN_WRITE); + if( result!=APR_SUCCESS ) + result = rc; + + if( connection->socket != NULL ) { + rc = apr_socket_close(connection->socket); + if( result!=APR_SUCCESS ) + result = rc; + connection->socket=NULL; + } + + *connection_ref=NULL; + return rc; +} // stomp_disconnect + + +/******************************************************************************** + * + * Wrappers around the apr_socket_send and apr_socket_recv calls so that they + * read/write their buffers fully. + * + ********************************************************************************/ +APR_DECLARE(apr_status_t) stomp_write_buffer(stomp_connection *connection, const char *data, apr_size_t size) +{ +apr_size_t remaining = size; + + size=0; + while( remaining>0 ) { + apr_size_t length = remaining; + apr_status_t rc = apr_socket_send(connection->socket, data, &length); + data+=length; + remaining -= length; + // size += length; + if( rc != APR_SUCCESS ) + return rc; + + } + return APR_SUCCESS; +} // stomp_write_buffer + +APR_DECLARE(apr_status_t) stomp_read_line(stomp_connection *connection, char **data, int* length, apr_pool_t *pool) +{ + apr_pool_t *tpool; + apr_status_t rc; + data_block_list *head, *tail; + apr_size_t i=0; + apr_size_t bytesRead=0; + char *p; + + rc = apr_pool_create(&tpool, pool); + if( rc != APR_SUCCESS ) { + return rc; + } + + head = tail = apr_pcalloc(tpool, sizeof(data_block_list)); + if( head == NULL ) + return APR_ENOMEM; + +#define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { apr_pool_destroy(tpool); return rc; } + + while( 1 ) { + + apr_size_t length = 1; + apr_status_t rc = apr_socket_recv(connection->socket, tail->data+i, &length); + CHECK_SUCCESS; + + if( length==1 ) { + i++; + bytesRead++; + + // Keep reading bytes till end of line + if( tail->data[i-1]=='\n') { + // Null terminate the string instead of having the newline + tail->data[i-1] = 0; + break; + } else if( tail->data[i-1]==0 ) { + // Encountered 0 before end of line + apr_pool_destroy(tpool); + return APR_EGENERAL; + } + + // Do we need to allocate a new block? + if( i >= sizeof( tail->data) ) { + tail->next = apr_pcalloc(tpool, sizeof(data_block_list)); + if( tail->next == NULL ) { + apr_pool_destroy(tpool); + return APR_ENOMEM; + } + tail=tail->next; + i=0; + } + } + } + +#undef CHECK_SUCCESS + // Now we have the whole frame and know how big it is. Allocate it's buffer + *data = apr_pcalloc(pool, bytesRead); + p = *data; + if( p==NULL ) { + apr_pool_destroy(tpool); + return APR_ENOMEM; + } + + // Copy the frame over to the new buffer. + *length = bytesRead - 1; + for( ;head != NULL; head = head->next ) { + int len = bytesRead > sizeof(head->data) ? sizeof(head->data) : bytesRead; + memcpy(p,head->data,len); + p+=len; + bytesRead-=len; + } + + apr_pool_destroy(tpool); + return APR_SUCCESS; +} + +APR_DECLARE(apr_status_t) stomp_read_buffer(stomp_connection *connection, char **data, apr_pool_t *pool) +{ + apr_pool_t *tpool; + apr_status_t rc; + data_block_list *head, *tail; + apr_size_t i=0; + apr_size_t bytesRead=0; + char *p; + + rc = apr_pool_create(&tpool, pool); + if( rc != APR_SUCCESS ) { + return rc; + } + + head = tail = apr_pcalloc(tpool, sizeof(data_block_list)); + if( head == NULL ) + return APR_ENOMEM; + +#define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { apr_pool_destroy(tpool); return rc; } + + // Keep reading bytes till end of frame is encountered. + while( 1 ) { + + apr_size_t length = 1; + apr_status_t rc = apr_socket_recv(connection->socket, tail->data+i, &length); + CHECK_SUCCESS; + + if( length==1 ) { + i++; + bytesRead++; + + // Keep reading bytes till end of frame + + if( tail->data[i-1]==0 ) { + break; + } + + // Do we need to allocate a new block? + if( i >= sizeof( tail->data) ) { + tail->next = apr_pcalloc(tpool, sizeof(data_block_list)); + if( tail->next == NULL ) { + apr_pool_destroy(tpool); + return APR_ENOMEM; + } + tail=tail->next; + i=0; + } + } + } +#undef CHECK_SUCCESS + + // Now we have the whole frame and know how big it is. Allocate it's buffer + *data = apr_pcalloc(pool, bytesRead); + p = *data; + if( p==NULL ) { + apr_pool_destroy(tpool); + return APR_ENOMEM; + } + + // Copy the frame over to the new buffer. + for( ;head != NULL; head = head->next ) { + int len = bytesRead > sizeof(head->data) ? sizeof(head->data) : bytesRead; + memcpy(p,head->data,len); + p+=len; + bytesRead-=len; + } + + apr_pool_destroy(tpool); + return APR_SUCCESS; +} + + +/******************************************************************************** + * + * Handles reading and writing stomp_frames to and from the connection + * + ********************************************************************************/ +APR_DECLARE(apr_status_t) stomp_write(stomp_connection *connection, stomp_frame *frame, apr_pool_t* pool) { +apr_status_t rc; + +#define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { return rc; } + // Write the command. + rc = stomp_write_buffer(connection, frame->command, strlen(frame->command)); + CHECK_SUCCESS; + rc = stomp_write_buffer(connection, "\n", 1); + CHECK_SUCCESS; + + // Write the headers + if( frame->headers != NULL ) { + + apr_hash_index_t *i; + const void *key; + void *value; + for (i = apr_hash_first(NULL, frame->headers); i; i = apr_hash_next(i)) { + apr_hash_this(i, &key, NULL, &value); + + rc = stomp_write_buffer(connection, key, strlen(key)); + CHECK_SUCCESS; + rc = stomp_write_buffer(connection, ":", 1); + CHECK_SUCCESS; + rc = stomp_write_buffer(connection, value, strlen(value)); + CHECK_SUCCESS; + rc = stomp_write_buffer(connection, "\n", 1); + CHECK_SUCCESS; + } + + if(frame->body_length >= 0) { + apr_pool_t *length_pool; + char *length_string; + + apr_pool_create(&length_pool, pool); + rc = stomp_write_buffer(connection, "content-length:", 15); + CHECK_SUCCESS; + + length_string = apr_itoa(length_pool, frame->body_length); + rc = stomp_write_buffer(connection, length_string, strlen(length_string)); + CHECK_SUCCESS; + rc = stomp_write_buffer(connection, "\n", 1); + CHECK_SUCCESS; + + apr_pool_destroy(length_pool); + } + } + rc = stomp_write_buffer(connection, "\n", 1); + CHECK_SUCCESS; + + // Write the body. + if( frame->body != NULL ) { + int body_length = frame->body_length; + if(body_length < 0) + body_length = strlen(frame->body); + rc = stomp_write_buffer(connection, frame->body, body_length); + CHECK_SUCCESS; + } + rc = stomp_write_buffer(connection, "\0\n", 2); + CHECK_SUCCESS; + +#undef CHECK_SUCCESS + + return APR_SUCCESS; +} // stomp_write + + +APR_DECLARE(apr_status_t) stomp_read(stomp_connection *connection, stomp_frame **frame, apr_pool_t *pool) { + + apr_status_t rc; + stomp_frame *f; + + f = apr_pcalloc(pool, sizeof(stomp_frame)); + if( f == NULL ) + return APR_ENOMEM; + + f->headers = apr_hash_make(pool); + if( f->headers == NULL ) + return APR_ENOMEM; + +#define CHECK_SUCCESS if( rc!=APR_SUCCESS ) { return rc; } + + // Parse the frame out. + char *p; + int length; + + // Parse the command. + rc = stomp_read_line(connection, &p, &length, pool); + CHECK_SUCCESS; + + f->command = p; + + + // Start parsing the headers. + while( 1 ) { + rc = stomp_read_line(connection, &p, &length, pool); + CHECK_SUCCESS; + + // Done with headers + if(length == 0) + break; + + { + // Parse the header line. + char *p2; + void *key; + void *value; + + p2 = strstr(p,":"); + if( p2 == NULL ) { + // Expected at 1 : to delimit the key from the value. + return APR_EGENERAL; + } + + // Null terminate the key + *p2=0; + key = p; + + // The rest if the value. + value = p2+1; + + // Insert key/value into hash table. + apr_hash_set(f->headers, key, APR_HASH_KEY_STRING, value); + } + } + + + // The remainder of the buffer (including the \n at the end) is the body) + rc = stomp_read_buffer(connection, &f->body, pool); + CHECK_SUCCESS; + + +#undef CHECK_SUCCESS + *frame = f; + return APR_SUCCESS; +} + +my_bool stompsend_init(UDF_INIT *initid, UDF_ARGS *args, char *message) +{ + + /* make sure user has provided exactly three string arguments */ + if (args->arg_count != 3 || (args->arg_type[0] != STRING_RESULT) + || (args->arg_type[1] != STRING_RESULT) + || (args->arg_type[2] != STRING_RESULT)){ + strcpy(message, "stompsend requires 3 string arguments"); + return 1; + } + + if ((args->lengths[0] == 0) || (args->lengths[1] == 0) || (args->lengths[2] == 0)){ + strcpy(message, "stompsend arguments can not be empty"); + return 1; + } + + // init APR + if (apr_initialize() != APR_SUCCESS) { + strcpy(message, "stompsend could not initialize APR"); + return 2; + } + + if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { + strcpy(message, "stompsend could not allocate APR pool"); + return 3; + } + + initid->maybe_null=0; + + return 0; +} + + +/****************************************************************************** +** purpose: deallocate memory allocated by str_translate_init(); this func +** is called once for each query which invokes str_translate(), +** it is called after all of the calls to str_translate() are done +** receives: pointer to UDF_INIT struct (the same which was used by +** str_translate_init() and str_translate()) +** returns: nothing +******************************************************************************/ +void stompsend_deinit(UDF_INIT *initid __attribute__((unused))) +{ + apr_pool_destroy(pool); +} +/******************************************************************************/ + +char *stompsend(UDF_INIT *initid, UDF_ARGS *args, + char *result, unsigned long *res_length, + char *null_value, char *error) +{ +stomp_connection *connection; +stomp_frame frame; +char *host = args->args[0]; +char *topic = args->args[1]; +char *message = args->args[2]; + + if (stomp_connect( &connection, host, 61613, pool) != APR_SUCCESS) { + strcpy(error, "stompsend could not connect to broker"); + *null_value = 1; + return NULL; + } + + frame.command = "CONNECT"; + frame.headers = apr_hash_make(pool); + frame.body = NULL; + frame.body_length = -1; + if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { + strcpy(error, "stompsend could not send CONNECT frame"); + *null_value = 1; + return NULL; + } + + frame.command = "SEND"; + frame.headers = apr_hash_make(pool); + apr_hash_set(frame.headers, "destination", APR_HASH_KEY_STRING, topic); + frame.body_length = -1; + frame.body = message; + if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { + strcpy(error, "stompsend could not send SEND frame"); + *null_value = 1; + return NULL; + } + + frame.command = "DISCONNECT"; + frame.headers = NULL; + frame.body_length = -1; + frame.body = NULL; + stomp_write(connection, &frame, pool); // ignore errors + + stomp_disconnect(&connection); + + *res_length = 2; + strcpy(result, "OK"); + + return result; +} // stompsend + + +my_bool stompsend1_init(UDF_INIT *initid, UDF_ARGS *args, char *message) +{ + + /* make sure user has provided exactly three string arguments */ + if (args->arg_count != 5 || (args->arg_type[0] != STRING_RESULT) + || (args->arg_type[1] != STRING_RESULT) + || (args->arg_type[2] != STRING_RESULT) + || (args->arg_type[3] != STRING_RESULT) + || (args->arg_type[4] != STRING_RESULT)){ + strcpy(message, "stompsend1 requires 5 string arguments"); + return 1; + } + + if ((args->lengths[0] == 0) || (args->lengths[1] == 0) || (args->lengths[2] == 0) || + (args->lengths[3] == 0) || (args->lengths[4] == 0)){ + strcpy(message, "stompsend1 arguments can not be empty"); + return 1; + } + + // init APR + if (apr_initialize() != APR_SUCCESS) { + strcpy(message, "stompsend1 could not initialize APR"); + return 2; + } + + if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { + strcpy(message, "stompsend1 could not allocate APR pool"); + return 3; + } + + initid->maybe_null=0; + + return 0; +} // stompsend1_init + +/****************************************************************************** +** purpose: deallocate memory allocated by str_translate_init(); this func +** is called once for each query which invokes str_translate(), +** it is called after all of the calls to str_translate() are done +** receives: pointer to UDF_INIT struct (the same which was used by +** str_translate_init() and str_translate()) +** returns: nothing +******************************************************************************/ +void stompsend1_deinit(UDF_INIT *initid __attribute__((unused))) +{ + apr_pool_destroy(pool); +} // stompsend1_deinit +/******************************************************************************/ + +char *stompsend1(UDF_INIT *initid, UDF_ARGS *args, + char *result, unsigned long *res_length, + char *null_value, char *error) +{ +stomp_connection *connection; +stomp_frame frame; +char *host = args->args[0]; +char *topic = args->args[1]; +char *message = args->args[2]; +char *hdr1name = args->args[3]; +char *hdr1val = args->args[4]; + + if (stomp_connect( &connection, host, 61613, pool) != APR_SUCCESS) { + strcpy(error, "stompsend1 could not connect to broker"); + *null_value = 1; + return NULL; + } + + frame.command = "CONNECT"; + frame.headers = apr_hash_make(pool); + frame.body = NULL; + frame.body_length = -1; + if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { + strcpy(error, "stompsend1 could not send CONNECT frame"); + *null_value = 1; + return NULL; + } + + frame.command = "SEND"; + frame.headers = apr_hash_make(pool); + apr_hash_set(frame.headers, "destination", APR_HASH_KEY_STRING, topic); + apr_hash_set(frame.headers, hdr1name, APR_HASH_KEY_STRING, hdr1val); + frame.body_length = -1; + frame.body = message; + if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { + strcpy(error, "stompsend1 could not send SEND frame"); + *null_value = 1; + return NULL; + } + + frame.command = "DISCONNECT"; + frame.headers = NULL; + frame.body_length = -1; + frame.body = NULL; + stomp_write(connection, &frame, pool); // ignore errors + + stomp_disconnect(&connection); + + *res_length = 2; + strcpy(result, "OK"); + + return result; +} // stompsend1 + +my_bool stompsend2_init(UDF_INIT *initid, UDF_ARGS *args, char *message) +{ + + /* make sure user has provided exactly three string arguments */ + if (args->arg_count != 7 || (args->arg_type[0] != STRING_RESULT) + || (args->arg_type[1] != STRING_RESULT) + || (args->arg_type[2] != STRING_RESULT) + || (args->arg_type[3] != STRING_RESULT) + || (args->arg_type[4] != STRING_RESULT) + || (args->arg_type[5] != STRING_RESULT) + || (args->arg_type[6] != STRING_RESULT)){ + strcpy(message, "stompsend2 requires 7 string arguments"); + return 1; + } + + if ((args->lengths[0] == 0) || (args->lengths[1] == 0) || (args->lengths[2] == 0) || + (args->lengths[3] == 0) || (args->lengths[4] == 0) || + (args->lengths[5] == 0) || (args->lengths[6] == 0)){ + strcpy(message, "stompsend2 arguments can not be empty"); + return 1; + } + + // init APR + if (apr_initialize() != APR_SUCCESS) { + strcpy(message, "stompsend2 could not initialize APR"); + return 2; + } + + if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { + strcpy(message, "stompsend2 could not allocate APR pool"); + return 3; + } + + initid->maybe_null=0; + + return 0; +} // stompsend1_init + + + +/****************************************************************************** +** purpose: deallocate memory allocated by str_translate_init(); this func +** is called once for each query which invokes str_translate(), +** it is called after all of the calls to str_translate() are done +** receives: pointer to UDF_INIT struct (the same which was used by +** str_translate_init() and str_translate()) +** returns: nothing +******************************************************************************/ +void stompsend2_deinit(UDF_INIT *initid __attribute__((unused))) +{ + apr_pool_destroy(pool); +} // stompsend2_deinit +/******************************************************************************/ + +char *stompsend2(UDF_INIT *initid, UDF_ARGS *args, + char *result, unsigned long *res_length, + char *null_value, char *error) +{ +stomp_connection *connection; +stomp_frame frame; +// stomp_frame *readframe; +char *host = args->args[0]; +char *topic = args->args[1]; +char *message = args->args[2]; +char *hdr1name = args->args[3]; +char *hdr1val = args->args[4]; +char *hdr2name = args->args[5]; +char *hdr2val = args->args[6]; + + if (stomp_connect( &connection, host, 61613, pool) != APR_SUCCESS) { + strcpy(error, "stompsend2 could not connect to broker"); + *null_value = 1; + return NULL; + } + + frame.command = "CONNECT"; + frame.headers = apr_hash_make(pool); + apr_hash_set(frame.headers, hdr1name, APR_HASH_KEY_STRING, hdr1val); + apr_hash_set(frame.headers, hdr2name, APR_HASH_KEY_STRING, hdr2val); + frame.body = NULL; + frame.body_length = -1; + if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { + strcpy(error, "stompsend2 could not send CONNECT frame"); + *null_value = 1; + return NULL; + } + + // validate CONNECT response frame +/* + if (stomp_read(connection, &readframe, pool) != APR_SUCCESS) { + strcpy(error, "stompsend2 could not receive response"); + *null_value = 1; + return NULL; + } + + if (prefix("ERROR", readframe->command) == 0) { + if (strlen(readframe->body) > 0){ + char * line = readframe->body; + line[strlen(line) - 1] = '\0'; + strcpy(result, line); + *res_length = strlen(result); + return result; + }else{ + strcpy(error, "stompsend2 did not receive a CONNECTED response"); + *null_value = 1; + return NULL; + } + } +*/ + //CONNECT frame was successful, carry on with sending the message + + frame.command = "SEND"; + frame.headers = apr_hash_make(pool); + apr_hash_set(frame.headers, "destination", APR_HASH_KEY_STRING, topic); + apr_hash_set(frame.headers, hdr1name, APR_HASH_KEY_STRING, hdr1val); + apr_hash_set(frame.headers, hdr2name, APR_HASH_KEY_STRING, hdr2val); + frame.body_length = -1; + frame.body = message; + if (stomp_write(connection, &frame, pool) != APR_SUCCESS) { + strcpy(error, "stompsend2 could not send SEND frame"); + *null_value = 1; + return NULL; + } + + frame.command = "DISCONNECT"; + frame.headers = NULL; + frame.body_length = -1; + frame.body = NULL; + stomp_write(connection, &frame, pool); // ignore errors + + stomp_disconnect(&connection); + + *res_length = 2; + strcpy(result, "OK"); + + return result; +} // stompsend2 + +#endif /* HAVE_DLOPEN */ diff --git a/lib_mysqludf_stomp.so b/lib_mysqludf_stomp.so deleted file mode 100644 index 35a2f5b..0000000 Binary files a/lib_mysqludf_stomp.so and /dev/null differ diff --git a/lib_mysqludf_stomp.sql b/lib_mysqludf_stomp.sql new file mode 100644 index 0000000..fd9805b --- /dev/null +++ b/lib_mysqludf_stomp.sql @@ -0,0 +1,31 @@ +/* + lib_mysqludf_stomp - a library to send STOMP messages + Copyright 2005 LogicBlaze Inc. + Copyright (C) 2011 Dmitry Demianov aka barlone + + this library use part of libstomp code + + web of STOMP project: http://stomp.codehaus.org/ + email: barlone@yandex.ru + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +DROP FUNCTION IF EXISTS stompsend; +DROP FUNCTION IF EXISTS stompsend1; +DROP FUNCTION IF EXISTS stompsend2; + +CREATE FUNCTION stompsend RETURNS STRING SONAME "lib_mysqludf_stomp.so"; +CREATE FUNCTION stompsend1 RETURNS STRING SONAME "lib_mysqludf_stomp.so"; +CREATE FUNCTION stompsend2 RETURNS STRING SONAME "lib_mysqludf_stomp.so";