123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555 |
- /*
- Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
- This file is part of libzmq, the ZeroMQ core engine in C++.
- libzmq is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License (LGPL) as published
- by the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
- As a special exception, the Contributors give you permission to link
- this library with independent modules to produce an executable,
- regardless of the license terms of these independent modules, and to
- copy and distribute the resulting executable under terms of your choice,
- provided that you also meet, for each linked independent module, the
- terms and conditions of the license of that module. An independent
- module is a module which is not derived from or based on this library.
- If you modify this library, you must extend this exception to your
- version of the library.
- libzmq is distributed in the hope that it will be useful, but WITHOUT
- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
- License for more details.
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
- #include "testutil.hpp"
- #include "testutil_unity.hpp"
- #include <string.h>
- #ifndef _WIN32
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <unistd.h>
- #endif
- // Helper macro to define the v4/v6 function pairs
- #define MAKE_TEST_V4V6(_test) \
- static void _test##_ipv4 () { _test (false); } \
- \
- static void _test##_ipv6 () \
- { \
- if (!is_ipv6_available ()) { \
- TEST_IGNORE_MESSAGE ("ipv6 is not available"); \
- } \
- _test (true); \
- }
- SETUP_TEARDOWN_TESTCONTEXT
- void msg_send_expect_success (void *s_, const char *group_, const char *body_)
- {
- zmq_msg_t msg;
- const size_t len = strlen (body_);
- TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, len));
- memcpy (zmq_msg_data (&msg), body_, len);
- TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_set_group (&msg, group_));
- int rc = zmq_msg_send (&msg, s_, 0);
- TEST_ASSERT_EQUAL_INT ((int) len, rc);
- // TODO isn't the msg closed by zmq_msg_send?
- zmq_msg_close (&msg);
- }
- void msg_recv_cmp (void *s_, const char *group_, const char *body_)
- {
- zmq_msg_t msg;
- const size_t len = strlen (body_);
- TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
- int recv_rc = TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, s_, 0));
- TEST_ASSERT_EQUAL_INT (len, recv_rc);
- TEST_ASSERT_EQUAL_STRING (group_, zmq_msg_group (&msg));
- TEST_ASSERT_EQUAL_STRING_LEN (body_, zmq_msg_data (&msg), len);
- zmq_msg_close (&msg);
- }
- void test_leave_unjoined_fails ()
- {
- void *dish = test_context_socket (ZMQ_DISH);
- // Leaving a group which we didn't join
- TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_leave (dish, "Movies"));
- test_context_socket_close (dish);
- }
- void test_long_group ()
- {
- size_t len = MAX_SOCKET_STRING;
- char my_endpoint[MAX_SOCKET_STRING];
- void *radio = test_context_socket (ZMQ_RADIO);
- bind_loopback (radio, false, my_endpoint, len);
- void *dish = test_context_socket (ZMQ_DISH);
- // Joining to a long group, over 14 chars
- char group[19] = "0123456789ABCDEFGH";
- TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, group));
- // Connecting
- TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dish, my_endpoint));
- msleep (SETTLE_TIME);
- // This is going to be sent to the dish
- msg_send_expect_success (radio, group, "HELLO");
- // Check the correct message arrived
- msg_recv_cmp (dish, group, "HELLO");
- test_context_socket_close (dish);
- test_context_socket_close (radio);
- }
- void test_join_too_long_fails ()
- {
- void *dish = test_context_socket (ZMQ_DISH);
- // Joining too long group
- char too_long_group[ZMQ_GROUP_MAX_LENGTH + 2];
- for (int index = 0; index < ZMQ_GROUP_MAX_LENGTH + 2; index++)
- too_long_group[index] = 'A';
- too_long_group[ZMQ_GROUP_MAX_LENGTH + 1] = '\0';
- TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_join (dish, too_long_group));
- test_context_socket_close (dish);
- }
- void test_join_twice_fails ()
- {
- void *dish = test_context_socket (ZMQ_DISH);
- TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "Movies"));
- // Duplicate Joining
- TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_join (dish, "Movies"));
- test_context_socket_close (dish);
- }
- void test_radio_dish_tcp_poll (int ipv6_)
- {
- size_t len = MAX_SOCKET_STRING;
- char my_endpoint[MAX_SOCKET_STRING];
- void *radio = test_context_socket (ZMQ_RADIO);
- bind_loopback (radio, ipv6_, my_endpoint, len);
- void *dish = test_context_socket (ZMQ_DISH);
- TEST_ASSERT_SUCCESS_ERRNO (
- zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
- // Joining
- TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "Movies"));
- // Connecting
- TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dish, my_endpoint));
- msleep (SETTLE_TIME);
- // This is not going to be sent as dish only subscribe to "Movies"
- msg_send_expect_success (radio, "TV", "Friends");
- // This is going to be sent to the dish
- msg_send_expect_success (radio, "Movies", "Godfather");
- // Check the correct message arrived
- msg_recv_cmp (dish, "Movies", "Godfather");
- // Join group during connection optvallen
- TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
- zmq_sleep (1);
- // This should arrive now as we joined the group
- msg_send_expect_success (radio, "TV", "Friends");
- // Check the correct message arrived
- msg_recv_cmp (dish, "TV", "Friends");
- // Leaving group
- TEST_ASSERT_SUCCESS_ERRNO (zmq_leave (dish, "TV"));
- zmq_sleep (1);
- // This is not going to be sent as dish only subscribe to "Movies"
- msg_send_expect_success (radio, "TV", "Friends");
- // This is going to be sent to the dish
- msg_send_expect_success (radio, "Movies", "Godfather");
- // test zmq_poll with dish
- zmq_pollitem_t items[] = {
- {radio, 0, ZMQ_POLLIN, 0}, // read publications
- {dish, 0, ZMQ_POLLIN, 0}, // read subscriptions
- };
- int rc = zmq_poll (items, 2, 2000);
- TEST_ASSERT_EQUAL_INT (1, rc);
- TEST_ASSERT_EQUAL_INT (ZMQ_POLLIN, items[1].revents);
- // Check the correct message arrived
- msg_recv_cmp (dish, "Movies", "Godfather");
- test_context_socket_close (dish);
- test_context_socket_close (radio);
- }
- MAKE_TEST_V4V6 (test_radio_dish_tcp_poll)
- void test_dish_connect_fails (int ipv6_)
- {
- void *dish = test_context_socket (ZMQ_DISH);
- TEST_ASSERT_SUCCESS_ERRNO (
- zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
- const char *url = ipv6_ ? "udp://[::1]:5556" : "udp://127.0.0.1:5556";
- // Connecting dish should fail
- TEST_ASSERT_FAILURE_ERRNO (ENOCOMPATPROTO, zmq_connect (dish, url));
- test_context_socket_close (dish);
- }
- MAKE_TEST_V4V6 (test_dish_connect_fails)
- void test_radio_bind_fails (int ipv6_)
- {
- void *radio = test_context_socket (ZMQ_RADIO);
- TEST_ASSERT_SUCCESS_ERRNO (
- zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
- // Connecting dish should fail
- // Bind radio should fail
- TEST_ASSERT_FAILURE_ERRNO (ENOCOMPATPROTO,
- zmq_bind (radio, "udp://*:5556"));
- test_context_socket_close (radio);
- }
- MAKE_TEST_V4V6 (test_radio_bind_fails)
- void test_radio_dish_udp (int ipv6_)
- {
- void *radio = test_context_socket (ZMQ_RADIO);
- void *dish = test_context_socket (ZMQ_DISH);
- TEST_ASSERT_SUCCESS_ERRNO (
- zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
- TEST_ASSERT_SUCCESS_ERRNO (
- zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
- const char *radio_url = ipv6_ ? "udp://[::1]:5556" : "udp://127.0.0.1:5556";
- TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, "udp://*:5556"));
- TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, radio_url));
- msleep (SETTLE_TIME);
- TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
- msg_send_expect_success (radio, "TV", "Friends");
- msg_recv_cmp (dish, "TV", "Friends");
- test_context_socket_close (dish);
- test_context_socket_close (radio);
- }
- MAKE_TEST_V4V6 (test_radio_dish_udp)
- #define MCAST_IPV4 "226.8.5.5"
- #define MCAST_IPV6 "ff02::7a65:726f:6df1:0a01"
- static const char *mcast_url (int ipv6_)
- {
- if (ipv6_) {
- return "udp://[" MCAST_IPV6 "]:5555";
- }
- return "udp://" MCAST_IPV4 ":5555";
- }
- // OSX uses a different name for this socket option
- #ifndef IPV6_ADD_MEMBERSHIP
- #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
- #endif
- union sa_u
- {
- struct sockaddr generic;
- struct sockaddr_in ipv4;
- struct sockaddr_in6 ipv6;
- };
- // Test if multicast is available on this machine by attempting to
- // send a receive a multicast datagram
- static bool is_multicast_available (int ipv6_)
- {
- int family = ipv6_ ? AF_INET6 : AF_INET;
- fd_t bind_sock = retired_fd;
- fd_t send_sock = retired_fd;
- int port = 5555;
- bool success = false;
- const char *msg = "it works";
- char buf[32];
- union sa_u any;
- union sa_u mcast;
- socklen_t sl;
- int rc;
- if (ipv6_) {
- struct sockaddr_in6 *any_ipv6 = &any.ipv6;
- struct sockaddr_in6 *mcast_ipv6 = &mcast.ipv6;
- any_ipv6->sin6_family = AF_INET6;
- any_ipv6->sin6_port = htons (port);
- any_ipv6->sin6_flowinfo = 0;
- any_ipv6->sin6_scope_id = 0;
- rc = test_inet_pton (AF_INET6, "::", &any_ipv6->sin6_addr);
- if (rc == 0) {
- goto out;
- }
- *mcast_ipv6 = *any_ipv6;
- rc = test_inet_pton (AF_INET6, MCAST_IPV6, &mcast_ipv6->sin6_addr);
- if (rc == 0) {
- goto out;
- }
- sl = sizeof (*any_ipv6);
- } else {
- struct sockaddr_in *any_ipv4 = &any.ipv4;
- struct sockaddr_in *mcast_ipv4 = &mcast.ipv4;
- any_ipv4->sin_family = AF_INET;
- any_ipv4->sin_port = htons (5555);
- rc = test_inet_pton (AF_INET, "0.0.0.0", &any_ipv4->sin_addr);
- if (rc == 0) {
- goto out;
- }
- *mcast_ipv4 = *any_ipv4;
- rc = test_inet_pton (AF_INET, MCAST_IPV4, &mcast_ipv4->sin_addr);
- if (rc == 0) {
- goto out;
- }
- sl = sizeof (*any_ipv4);
- }
- bind_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP);
- if (bind_sock < 0) {
- goto out;
- }
- send_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP);
- if (bind_sock < 0) {
- goto out;
- }
- rc = bind (bind_sock, &any.generic, sl);
- if (rc < 0) {
- goto out;
- }
- if (ipv6_) {
- struct ipv6_mreq mreq;
- const sockaddr_in6 *const mcast_ipv6 = &mcast.ipv6;
- mreq.ipv6mr_multiaddr = mcast_ipv6->sin6_addr;
- mreq.ipv6mr_interface = 0;
- rc = setsockopt (bind_sock, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
- as_setsockopt_opt_t (&mreq), sizeof (mreq));
- if (rc < 0) {
- goto out;
- }
- int loop = 1;
- rc = setsockopt (send_sock, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
- as_setsockopt_opt_t (&loop), sizeof (loop));
- if (rc < 0) {
- goto out;
- }
- } else {
- struct ip_mreq mreq;
- const sockaddr_in *const mcast_ipv4 = &mcast.ipv4;
- mreq.imr_multiaddr = mcast_ipv4->sin_addr;
- mreq.imr_interface.s_addr = htonl (INADDR_ANY);
- rc = setsockopt (bind_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
- as_setsockopt_opt_t (&mreq), sizeof (mreq));
- if (rc < 0) {
- goto out;
- }
- int loop = 1;
- rc = setsockopt (send_sock, IPPROTO_IP, IP_MULTICAST_LOOP,
- as_setsockopt_opt_t (&loop), sizeof (loop));
- if (rc < 0) {
- goto out;
- }
- }
- msleep (SETTLE_TIME);
- rc = sendto (send_sock, msg, static_cast<socklen_t> (strlen (msg)), 0,
- &mcast.generic, sl);
- if (rc < 0) {
- goto out;
- }
- msleep (SETTLE_TIME);
- rc = recvfrom (bind_sock, buf, sizeof (buf) - 1, 0, NULL, 0);
- if (rc < 0) {
- goto out;
- }
- buf[rc] = '\0';
- success = (strcmp (msg, buf) == 0);
- out:
- if (bind_sock >= 0) {
- close (bind_sock);
- }
- if (send_sock >= 0) {
- close (send_sock);
- }
- return success;
- }
- static void ignore_if_unavailable (int ipv6_)
- {
- if (ipv6_ && !is_ipv6_available ())
- TEST_IGNORE_MESSAGE ("No IPV6 available");
- if (!is_multicast_available (ipv6_))
- TEST_IGNORE_MESSAGE ("No multicast available");
- }
- static void test_radio_dish_mcast (int ipv6_)
- {
- ignore_if_unavailable (ipv6_);
- void *radio = test_context_socket (ZMQ_RADIO);
- void *dish = test_context_socket (ZMQ_DISH);
- TEST_ASSERT_SUCCESS_ERRNO (
- zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
- TEST_ASSERT_SUCCESS_ERRNO (
- zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
- const char *url = mcast_url (ipv6_);
- TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url));
- TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url));
- msleep (SETTLE_TIME);
- TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
- msg_send_expect_success (radio, "TV", "Friends");
- msg_recv_cmp (dish, "TV", "Friends");
- test_context_socket_close (dish);
- test_context_socket_close (radio);
- }
- MAKE_TEST_V4V6 (test_radio_dish_mcast)
- static void test_radio_dish_no_loop (int ipv6_)
- {
- #ifdef _WIN32
- TEST_IGNORE_MESSAGE (
- "ZMQ_MULTICAST_LOOP=false does not appear to work on Windows (TODO)");
- #endif
- ignore_if_unavailable (ipv6_);
- void *radio = test_context_socket (ZMQ_RADIO);
- void *dish = test_context_socket (ZMQ_DISH);
- TEST_ASSERT_SUCCESS_ERRNO (
- zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
- TEST_ASSERT_SUCCESS_ERRNO (
- zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
- // Disable multicast loop for radio
- int loop = 0;
- TEST_ASSERT_SUCCESS_ERRNO (
- zmq_setsockopt (radio, ZMQ_MULTICAST_LOOP, &loop, sizeof (int)));
- const char *url = mcast_url (ipv6_);
- TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url));
- TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url));
- msleep (SETTLE_TIME);
- TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
- msg_send_expect_success (radio, "TV", "Friends");
- // Looping is disabled, we shouldn't receive anything
- msleep (SETTLE_TIME);
- TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (dish, NULL, 0, ZMQ_DONTWAIT));
- test_context_socket_close (dish);
- test_context_socket_close (radio);
- }
- MAKE_TEST_V4V6 (test_radio_dish_no_loop)
- int main (void)
- {
- setup_test_environment ();
- UNITY_BEGIN ();
- RUN_TEST (test_leave_unjoined_fails);
- RUN_TEST (test_join_too_long_fails);
- RUN_TEST (test_long_group);
- RUN_TEST (test_join_twice_fails);
- RUN_TEST (test_radio_bind_fails_ipv4);
- RUN_TEST (test_radio_bind_fails_ipv6);
- RUN_TEST (test_dish_connect_fails_ipv4);
- RUN_TEST (test_dish_connect_fails_ipv6);
- RUN_TEST (test_radio_dish_tcp_poll_ipv4);
- RUN_TEST (test_radio_dish_tcp_poll_ipv6);
- RUN_TEST (test_radio_dish_udp_ipv4);
- RUN_TEST (test_radio_dish_udp_ipv6);
- RUN_TEST (test_radio_dish_mcast_ipv4);
- RUN_TEST (test_radio_dish_no_loop_ipv4);
- RUN_TEST (test_radio_dish_mcast_ipv6);
- RUN_TEST (test_radio_dish_no_loop_ipv6);
- return UNITY_END ();
- }
|