diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/Makefile.am | 23 | ||||
-rw-r--r-- | test/lsudev.c | 180 | ||||
-rwxr-xr-x | test/mm-send-sms.py | 52 | ||||
-rw-r--r-- | test/mm-test-pppd-plugin.c | 264 | ||||
-rwxr-xr-x | test/mm-test.py | 527 |
5 files changed, 1046 insertions, 0 deletions
diff --git a/test/Makefile.am b/test/Makefile.am new file mode 100644 index 0000000..3d571c3 --- /dev/null +++ b/test/Makefile.am @@ -0,0 +1,23 @@ +if HAVE_PPPD_H + +pppd_plugindir = $(PPPD_PLUGIN_DIR) +pppd_plugin_LTLIBRARIES = mm-test-pppd-plugin.la + +mm_test_pppd_plugin_la_SOURCES = \ + mm-test-pppd-plugin.c + +mm_test_pppd_plugin_la_CPPFLAGS = $(MM_CFLAGS) +mm_test_pppd_plugin_la_LDFLAGS = -module -avoid-version +mm_test_pppd_plugin_la_LIBADD = $(MM_LIBS) + +endif + +noinst_PROGRAMS = lsudev +lsudev_SOURCES = lsudev.c +lsudev_CPPFLAGS = $(GUDEV_CFLAGS) +lsudev_LDADD = $(GUDEV_LIBS) + + +EXTRA_DIST = \ + mm-test.py + diff --git a/test/lsudev.c b/test/lsudev.c new file mode 100644 index 0000000..02d9ab2 --- /dev/null +++ b/test/lsudev.c @@ -0,0 +1,180 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: t; c-basic-offset: 4 -*- */ +/* This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright (C) 2009 Red Hat, Inc. + */ + +#include <glib.h> +#define G_UDEV_API_IS_SUBJECT_TO_CHANGE +#include <gudev/gudev.h> + +#include <stdio.h> +#include <string.h> +#include <stdlib.h> +#include <stdarg.h> +#include <signal.h> + +static GMainLoop *loop = NULL; + +static void +signal_handler (int signo) +{ + if (signo == SIGINT || signo == SIGTERM) { + g_message ("Caught signal %d, shutting down...", signo); + g_main_loop_quit (loop); + } +} + +static void +setup_signals (void) +{ + struct sigaction action; + sigset_t mask; + + sigemptyset (&mask); + action.sa_handler = signal_handler; + action.sa_mask = mask; + action.sa_flags = 0; + sigaction (SIGTERM, &action, NULL); + sigaction (SIGINT, &action, NULL); +} + +static void +println (guint indent, const char *fmt, ...) +{ + va_list args; + char real_fmt[1000]; + int i; + + g_return_if_fail (fmt != NULL); + g_return_if_fail (indent < sizeof (real_fmt) - 2 - strlen (fmt)); + + for (i = 0; i < indent; i++) + real_fmt[i] = ' '; + strcpy (&real_fmt[i], fmt); + real_fmt[i + strlen (fmt)] = '\n'; + real_fmt[i + strlen (fmt) + 1] = '\0'; + + va_start (args, fmt); + vprintf (real_fmt, args); + va_end (args); +} + +static void +dump_device_and_parent (GUdevDevice *device, guint indent) +{ + const char **list, **iter; + GUdevDevice *parent; + char propstr[500]; + guint32 namelen = 0, i; + + println (indent, "------------------------------------------------------"); + println (indent, "Name: %s", g_udev_device_get_name (device)); + println (indent, "Type: %s", g_udev_device_get_devtype (device)); + println (indent, "Subsys: %s", g_udev_device_get_subsystem (device)); + println (indent, "Number: %s", g_udev_device_get_number (device)); + println (indent, "Path: %s", g_udev_device_get_sysfs_path (device)); + println (indent, "Driver: %s", g_udev_device_get_driver (device)); + println (indent, "Action: %s", g_udev_device_get_action (device)); + println (indent, "Seq Num: %s", g_udev_device_get_seqnum (device)); + println (indent, "Dev File: %s", g_udev_device_get_device_file (device)); + + println (indent, ""); + println (indent, "Properties:"); + + /* Get longest property name length for alignment */ + list = (const char **) g_udev_device_get_property_keys (device); + for (iter = list; iter && *iter; iter++) { + if (strlen (*iter) > namelen) + namelen = strlen (*iter); + } + namelen++; + + for (iter = list; iter && *iter; iter++) { + strcpy (propstr, *iter); + strcat (propstr, ":"); + for (i = 0; i < namelen - strlen (*iter); i++) + strcat (propstr, " "); + strcat (propstr, g_udev_device_get_property (device, *iter)); + println (indent + 2, "%s", propstr); + } + + println (indent, ""); + + parent = g_udev_device_get_parent (device); + if (parent) { + dump_device_and_parent (parent, indent + 4); + g_object_unref (parent); + } +} + +static void +handle_uevent (GUdevClient *client, + const char *action, + GUdevDevice *device, + gpointer user_data) +{ + const char *expected_subsys = user_data; + const char *subsys; + + g_return_if_fail (client != NULL); + g_return_if_fail (action != NULL); + g_return_if_fail (device != NULL); + + /* A bit paranoid */ + subsys = g_udev_device_get_subsystem (device); + g_return_if_fail (subsys != NULL); + + g_return_if_fail (!strcmp (subsys, expected_subsys)); + + g_print ("---- (EVENT: %s) ----\n", action); + dump_device_and_parent (device, 0); + g_print ("\n"); +} + +int +main (int argc, char *argv[]) +{ + GUdevClient *client; + const char *subsys[2] = { NULL, NULL }; + GList *list, *iter; + + if (argc != 2) { + g_warning ("Usage: %s [subsystem]", argv[0]); + return 1; + } + + g_type_init (); + + loop = g_main_loop_new (NULL, FALSE); + + setup_signals (); + + subsys[0] = argv[1]; + client = g_udev_client_new (subsys); + g_signal_connect (client, "uevent", G_CALLBACK (handle_uevent), (gpointer) subsys[0]); + + list = g_udev_client_query_by_subsystem (client, subsys[0]); + for (iter = list; iter; iter = g_list_next (iter)) { + dump_device_and_parent (G_UDEV_DEVICE (iter->data), 0); + g_print ("\n"); + g_object_unref (G_UDEV_DEVICE (iter->data)); + } + + g_main_loop_run (loop); + + return 0; +} + diff --git a/test/mm-send-sms.py b/test/mm-send-sms.py new file mode 100755 index 0000000..55d453c --- /dev/null +++ b/test/mm-send-sms.py @@ -0,0 +1,52 @@ +#!/usr/bin/python +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details: +# +# Copyright (C) 2009 Novell, Inc. +# + +# An example on how to send an SMS message using ModemManager + +import sys +import dbus + +if len(sys.argv) != 3: + print "Usage: %s <number> <message>" % sys.argv[0] + sys.exit(1) + +number = sys.argv[1] +message = sys.argv[2] + +bus = dbus.SystemBus() + +manager_proxy = bus.get_object('org.freedesktop.ModemManager', '/org/freedesktop/ModemManager') +manager_iface = dbus.Interface(manager_proxy, dbus_interface='org.freedesktop.ModemManager') +modems = manager_iface.EnumerateDevices() +if len(modems) == 0: + print "No modems found" + sys.exit(1) + +proxy = bus.get_object('org.freedesktop.ModemManager', modems[0]) +modem = dbus.Interface(proxy, dbus_interface='org.freedesktop.ModemManager.Modem') +modem.Enable(True) + +msg_dict = dbus.Dictionary({ dbus.String('number') : dbus.String(number), + dbus.String('text') : dbus.String(message) + }, + signature=dbus.Signature("sv")) + +sms_iface = dbus.Interface(proxy, dbus_interface='org.freedesktop.ModemManager.Modem.Gsm.SMS') +try: + sms_iface.Send(msg_dict) +except: + print "Sending message failed" +finally: + modem.Enable(False) diff --git a/test/mm-test-pppd-plugin.c b/test/mm-test-pppd-plugin.c new file mode 100644 index 0000000..75163fc --- /dev/null +++ b/test/mm-test-pppd-plugin.c @@ -0,0 +1,264 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: t; c-basic-offset: 4 -*- */ +/* This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright (C) 2008 Novell, Inc. + * Copyright (C) 2008 - 2009 Red Hat, Inc. + */ + +#include <string.h> +#include <pppd/pppd.h> +#include <pppd/fsm.h> +#include <pppd/ipcp.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <glib.h> + +int plugin_init (void); + +char pppd_version[] = VERSION; +char *my_user = NULL; +char *my_pass = NULL; +char *my_file = NULL; + +static void +mm_phasechange (void *data, int arg) +{ + const char *ppp_phase = NULL; + + switch (arg) { + case PHASE_DEAD: + ppp_phase = "dead"; + break; + case PHASE_INITIALIZE: + ppp_phase = "initialize"; + break; + case PHASE_SERIALCONN: + ppp_phase = "serial connection"; + break; + case PHASE_DORMANT: + ppp_phase = "dormant"; + break; + case PHASE_ESTABLISH: + ppp_phase = "establish"; + break; + case PHASE_AUTHENTICATE: + ppp_phase = "authenticate"; + break; + case PHASE_CALLBACK: + ppp_phase = "callback"; + break; + case PHASE_NETWORK: + ppp_phase = "network"; + break; + case PHASE_RUNNING: + ppp_phase = "running"; + break; + case PHASE_TERMINATE: + ppp_phase = "terminate"; + break; + case PHASE_DISCONNECT: + ppp_phase = "disconnect"; + break; + case PHASE_HOLDOFF: + ppp_phase = "holdoff"; + break; + case PHASE_MASTER: + ppp_phase = "master"; + break; + default: + ppp_phase = "unknown"; + break; + } + + g_message ("mm-test-ppp-plugin: (%s): phase now '%s'", __func__, ppp_phase); +} + +static void +append_ip4_addr (GString *str, const char *tag, guint32 addr) +{ + char buf[INET_ADDRSTRLEN + 2]; + struct in_addr tmp_addr = { .s_addr = addr }; + + memset (buf, 0, sizeof (buf)); + + if (inet_ntop (AF_INET, &tmp_addr, buf, sizeof (buf) - 1)) + g_string_append_printf (str, "%s %s\n", tag, buf); +} + +static void +mm_ip_up (void *data, int arg) +{ + ipcp_options opts = ipcp_gotoptions[0]; + ipcp_options peer_opts = ipcp_hisoptions[0]; + guint32 pppd_made_up_address = htonl (0x0a404040 + ifunit); + GString *contents; + GError *err = NULL; + gboolean success; + + g_message ("mm-test-ppp-plugin: (%s): ip-up event", __func__); + + if (!opts.ouraddr) { + g_warning ("mm-test-ppp-plugin: (%s): didn't receive an internal IP from pppd!", __func__); + mm_phasechange (NULL, PHASE_DEAD); + return; + } + + contents = g_string_sized_new (100); + + g_string_append_printf (contents, "iface %s\n", ifname); + + append_ip4_addr (contents, "addr", opts.ouraddr); + + /* Prefer the peer options remote address first, _unless_ pppd made the + * address up, at which point prefer the local options remote address, + * and if that's not right, use the made-up address as a last resort. + */ + if (peer_opts.hisaddr && (peer_opts.hisaddr != pppd_made_up_address)) + append_ip4_addr (contents, "gateway", peer_opts.hisaddr); + else if (opts.hisaddr) + append_ip4_addr (contents, "gateway", opts.hisaddr); + else if (peer_opts.hisaddr == pppd_made_up_address) { + /* As a last resort, use the made-up address */ + append_ip4_addr (contents, "gateway", peer_opts.hisaddr); + } + + if (opts.dnsaddr[0] || opts.dnsaddr[1]) { + if (opts.dnsaddr[0]) + append_ip4_addr (contents, "dns1", opts.dnsaddr[0]); + if (opts.dnsaddr[1]) + append_ip4_addr (contents, "dns2", opts.dnsaddr[1]); + } + + if (opts.winsaddr[0] || opts.winsaddr[1]) { + if (opts.winsaddr[0]) + append_ip4_addr (contents, "wins1", opts.winsaddr[0]); + if (opts.winsaddr[1]) + append_ip4_addr (contents, "wins2", opts.winsaddr[1]); + } + + g_string_append (contents, "DONE\n"); + + success = g_file_set_contents (my_file, contents->str, -1, &err); + if (success) + g_message ("nm-ppp-plugin: (%s): saved IP4 config to %s", __func__, my_file); + else { + g_message ("nm-ppp-plugin: (%s): error saving IP4 config to %s: (%d) %s", + __func__, my_file, err->code, err->message); + g_clear_error (&err); + } + + g_string_free (contents, TRUE); +} + +static int +get_chap_check() +{ + return 1; +} + +static int +get_pap_check() +{ + return 1; +} + +static int +get_credentials (char *username, char *password) +{ + size_t len; + + if (username && !password) { + /* pppd is checking pap support; return 1 for supported */ + return 1; + } + + g_message ("nm-ppp-plugin: (%s): sending credentials (%s / %s)", + __func__, + my_user ? my_user : "", + my_pass ? my_pass : ""); + + if (my_user) { + len = strlen (my_user) + 1; + len = len < MAXNAMELEN ? len : MAXNAMELEN; + + strncpy (username, my_user, len); + username[len - 1] = '\0'; + } + + if (my_pass) { + len = strlen (my_pass) + 1; + len = len < MAXSECRETLEN ? len : MAXSECRETLEN; + + strncpy (password, my_pass, len); + password[len - 1] = '\0'; + } + + return 1; +} + +static void +mm_exit_notify (void *data, int arg) +{ + g_message ("mm-test-ppp-plugin: (%s): cleaning up", __func__); + + g_free (my_user); + my_user = NULL; + g_free (my_pass); + my_pass = NULL; + g_free (my_file); + my_file = NULL; +} + +int +plugin_init (void) +{ + char **args; + + g_message ("mm-test-ppp-plugin: (%s): initializing", __func__); + + /* mm-test passes the file + username + password in as the 'ipparam' arg + * to pppd. + */ + args = g_strsplit (ipparam, "+", 0); + if (!args || g_strv_length (args) != 3) { + g_message ("mm-test-ppp-plugin: (%s): ipparam arguments error ('%s')", + __func__, ipparam); + return -1; + } + + my_user = (args[0] && strlen (args[0])) ? g_strdup (args[0]) : NULL; + my_pass = (args[1] && strlen (args[1])) ? g_strdup (args[1]) : NULL; + my_file = (args[2] && strlen (args[2])) ? g_strdup (args[2]) : NULL; + + g_strfreev (args); + + if (!my_file) { + g_message ("mm-test-ppp-plugin: (%s): missing IP config file", + __func__); + return -1; + } + + chap_passwd_hook = get_credentials; + chap_check_hook = get_chap_check; + pap_passwd_hook = get_credentials; + pap_check_hook = get_pap_check; + + add_notifier (&phasechange, mm_phasechange, NULL); + add_notifier (&ip_up_notifier, mm_ip_up, NULL); + add_notifier (&exitnotify, mm_exit_notify, NULL); + + return 0; +} diff --git a/test/mm-test.py b/test/mm-test.py new file mode 100755 index 0000000..99a355f --- /dev/null +++ b/test/mm-test.py @@ -0,0 +1,527 @@ +#!/usr/bin/python +# -*- Mode: python; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details: +# +# Copyright (C) 2008 Novell, Inc. +# Copyright (C) 2009 Red Hat, Inc. +# + +import sys, dbus, time, os, string, subprocess, socket + +DBUS_INTERFACE_PROPERTIES='org.freedesktop.DBus.Properties' +MM_DBUS_SERVICE='org.freedesktop.ModemManager' +MM_DBUS_PATH='/org/freedesktop/ModemManager' +MM_DBUS_INTERFACE='org.freedesktop.ModemManager' +MM_DBUS_INTERFACE_MODEM='org.freedesktop.ModemManager.Modem' +MM_DBUS_INTERFACE_MODEM_CDMA='org.freedesktop.ModemManager.Modem.Cdma' +MM_DBUS_INTERFACE_MODEM_GSM_CARD='org.freedesktop.ModemManager.Modem.Gsm.Card' +MM_DBUS_INTERFACE_MODEM_GSM_NETWORK='org.freedesktop.ModemManager.Modem.Gsm.Network' +MM_DBUS_INTERFACE_MODEM_SIMPLE='org.freedesktop.ModemManager.Modem.Simple' + +def get_cdma_band_class(band_class): + if band_class == 1: + return "800MHz" + elif band_class == 2: + return "1900MHz" + else: + return "Unknown" + +def get_reg_state(state): + if state == 1: + return "registered (roaming unknown)" + elif state == 2: + return "registered on home network" + elif state == 3: + return "registered on roaming network" + else: + return "unknown" + +def cdma_inspect(proxy, dump_private): + cdma = dbus.Interface(proxy, dbus_interface=MM_DBUS_INTERFACE_MODEM_CDMA) + + esn = "<private>" + if dump_private: + try: + esn = cdma.GetEsn() + except dbus.exceptions.DBusException: + esn = "<unavailable>" + + print "" + print "ESN: %s" % esn + + try: + (cdma_1x_state, evdo_state) = cdma.GetRegistrationState() + print "1x State: %s" % get_reg_state (cdma_1x_state) + print "EVDO State: %s" % get_reg_state (evdo_state) + except dbus.exceptions.DBusException, e: + print "Error reading registration state: %s" % e + + try: + quality = cdma.GetSignalQuality() + print "Signal quality: %d" % quality + except dbus.exceptions.DBusException, e: + print "Error reading signal quality: %s" % e + + try: + info = cdma.GetServingSystem() + print "Class: %s" % get_cdma_band_class(info[0]) + print "Band: %s" % info[1] + print "SID: %d" % info[2] + except dbus.exceptions.DBusException, e: + print "Error reading serving system: %s" % e + +def cdma_connect(proxy, user, password): + # Modem.Simple interface + simple = dbus.Interface(proxy, dbus_interface=MM_DBUS_INTERFACE_MODEM_SIMPLE) + try: + simple.Connect({'number':"#777"}, timeout=92) + print "\nConnected!" + return True + except Exception, e: + print "Error connecting: %s" % e + return False + + +def get_gsm_network_mode(modem): + mode = modem.GetNetworkMode() + if mode == 0x0: + mode = "Unknown" + elif mode == 0x1: + mode = "Any" + elif mode == 0x2: + mode = "GPRS" + elif mode == 0x4: + mode = "EDGE" + elif mode == 0x8: + mode = "UMTS" + elif mode == 0x10: + mode = "HSDPA" + elif mode == 0x20: + mode = "2G Preferred" + elif mode == 0x40: + mode = "3G Preferred" + elif mode == 0x80: + mode = "2G Only" + elif mode == 0x100: + mode = "3G Only" + elif mode == 0x200: + mode = "HSUPA" + elif mode == 0x400: + mode = "HSPA" + else: + mode = "(Unknown)" + + print "Mode: %s" % mode + +def get_gsm_band(modem): + band = modem.GetBand() + if band == 0x0: + band = "Unknown" + elif band == 0x1: + band = "Any" + elif band == 0x2: + band = "EGSM (900 MHz)" + elif band == 0x4: + band = "DCS (1800 MHz)" + elif band == 0x8: + band = "PCS (1900 MHz)" + elif band == 0x10: + band = "G850 (850 MHz)" + elif band == 0x20: + band = "U2100 (WCSMA 2100 MHZ, Class I)" + elif band == 0x40: + band = "U1700 (WCDMA 3GPP UMTS1800 MHz, Class III)" + elif band == 0x80: + band = "17IV (WCDMA 3GPP AWS 1700/2100 MHz, Class IV)" + elif band == 0x100: + band = "U800 (WCDMA 3GPP UMTS800 MHz, Class VI)" + elif band == 0x200: + band = "U850 (WCDMA 3GPP UMT850 MHz, Class V)" + elif band == 0x400: + band = "U900 (WCDMA 3GPP UMTS900 MHz, Class VIII)" + elif band == 0x800: + band = "U17IX (WCDMA 3GPP UMTS MHz, Class IX)" + else: + band = "(invalid)" + + print "Band: %s" % band + + +def gsm_inspect(proxy, dump_private, do_scan): + # Gsm.Card interface + card = dbus.Interface(proxy, dbus_interface=MM_DBUS_INTERFACE_MODEM_GSM_CARD) + + imei = "<private>" + imsi = "<private>" + if dump_private: + try: + imei = card.GetImei() + except dbus.exceptions.DBusException: + imei = "<unavailable>" + try: + imsi = card.GetImsi() + except dbus.exceptions.DBusException: + imsi = "<unavailable>" + + print "IMEI: %s" % imei + print "IMSI: %s" % imsi + + # Gsm.Network interface + net = dbus.Interface(proxy, dbus_interface=MM_DBUS_INTERFACE_MODEM_GSM_NETWORK) + try: + quality = net.GetSignalQuality() + print "Signal quality: %d" % quality + except dbus.exceptions.DBusException, e: + print "Error reading signal quality: %s" % e + + if not do_scan: + return + + print "Scanning..." + try: + results = net.Scan(timeout=120) + except dbus.exceptions.DBusException, e: + print "Error scanning: %s" % e + results = {} + + for r in results: + status = r['status'] + if status == "1": + status = "available" + elif status == "2": + status = "current" + elif status == "3": + status = "forbidden" + else: + status = "(Unknown)" + + access_tech = "" + try: + access_tech_num = r['access-tech'] + if access_tech_num == "0": + access_tech = "(GSM)" + elif access_tech_num == "1": + access_tech = "(Compact GSM)" + elif access_tech_num == "2": + access_tech = "(UMTS)" + elif access_tech_num == "3": + access_tech = "(EDGE)" + elif access_tech_num == "4": + access_tech = "(HSDPA)" + elif access_tech_num == "5": + access_tech = "(HSUPA)" + elif access_tech_num == "6": + access_tech = "(HSPA)" + except KeyError: + pass + + if r.has_key('operator-long') and len(r['operator-long']): + print "%s: %s %s" % (r['operator-long'], status, access_tech) + elif r.has_key('operator-short') and len(r['operator-short']): + print "%s: %s %s" % (r['operator-short'], status, access_tech) + else: + print "%s: %s %s" % (r['operator-num'], status, access_tech) + +def gsm_connect(proxy, apn, user, password): + # Modem.Simple interface + simple = dbus.Interface(proxy, dbus_interface=MM_DBUS_INTERFACE_MODEM_SIMPLE) + try: + opts = {'number':"*99#"} + if apn is not None: + opts['apn'] = apn + if user is not None: + opts['username'] = user + if password is not None: + opts['password'] = password + simple.Connect(opts, timeout=120) + print "\nConnected!" + return True + except Exception, e: + print "Error connecting: %s" % e + return False + +def pppd_find(): + paths = ["/usr/local/sbin/pppd", "/usr/sbin/pppd", "/sbin/pppd"] + for p in paths: + if os.path.exists(p): + return p + return None + +def ppp_start(device, user, password, tmpfile): + path = pppd_find() + if not path: + return None + + args = [path] + args += ["nodetach"] + args += ["lock"] + args += ["nodefaultroute"] + args += ["debug"] + if user: + args += ["user"] + args += [user] + args += ["noipdefault"] + args += ["115200"] + args += ["noauth"] + args += ["crtscts"] + args += ["modem"] + args += ["usepeerdns"] + args += ["ipparam"] + + ipparam = "" + if user: + ipparam += user + ipparam += "+" + if password: + ipparam += password + ipparam += "+" + ipparam += tmpfile + args += [ipparam] + + args += ["plugin"] + args += ["mm-test-pppd-plugin.so"] + + args += [device] + + return subprocess.Popen(args, close_fds=True, cwd="/", env={}) + +def ppp_wait(p, tmpfile): + i = 0 + while p.poll() == None and i < 30: + time.sleep(1) + if os.path.exists(tmpfile): + f = open(tmpfile, 'r') + stuff = f.read(500) + idx = string.find(stuff, "DONE") + f.close() + if idx >= 0: + return True + i += 1 + return False + +def ppp_stop(p): + import signal + p.send_signal(signal.SIGTERM) + p.wait() + +def ntop_helper(ip): + ip = socket.ntohl(ip) + n1 = ip >> 24 & 0xFF + n2 = ip >> 16 & 0xFF + n3 = ip >> 8 & 0xFF + n4 = ip & 0xFF + a = "%c%c%c%c" % (n1, n2, n3, n4) + return socket.inet_ntop(socket.AF_INET, a) + +def static_start(iface, modem): + (addr_num, dns1_num, dns2_num, dns3_num) = modem.GetIP4Config() + addr = ntop_helper(addr_num) + dns1 = ntop_helper(dns1_num) + dns2 = ntop_helper(dns2_num) + configure_iface(iface, addr, 0, dns1, dns2) + +def down_iface(iface): + ip = ["ip", "addr", "flush", "dev", iface] + print " ".join(ip) + subprocess.call(ip) + ip = ["ip", "link", "set", iface, "down"] + print " ".join(ip) + subprocess.call(ip) + +def configure_iface(iface, addr, gw, dns1, dns2): + print "\n\n******************************" + print "iface: %s" % iface + print "addr: %s" % addr + print "gw: %s" % gw + print "dns1: %s" % dns1 + print "dns2: %s" % dns2 + + ifconfig = ["ifconfig", iface, "%s/32" % addr] + if gw != 0: + ifconfig += ["pointopoint", gw] + print " ".join(ifconfig) + print "\n******************************\n" + + subprocess.call(ifconfig) + +def file_configure_iface(tmpfile): + addr = None + gw = None + iface = None + dns1 = None + dns2 = None + + f = open(tmpfile, 'r') + lines = f.readlines() + for l in lines: + if l.startswith("addr"): + addr = l[len("addr"):].strip() + if l.startswith("gateway"): + gw = l[len("gateway"):].strip() + if l.startswith("iface"): + iface = l[len("iface"):].strip() + if l.startswith("dns1"): + dns1 = l[len("dns1"):].strip() + if l.startswith("dns2"): + dns2 = l[len("dns2"):].strip() + f.close() + + configure_iface(iface, addr, gw, dns1, dns2) + return iface + +def try_ping(iface): + cmd = ["ping", "-I", iface, "-c", "4", "-i", "3", "-w", "20", "4.2.2.1"] + print " ".join(cmd) + retcode = subprocess.call(cmd) + if retcode != 0: + print "PING: failed" + else: + print "PING: success" + + +dump_private = False +connect = False +apn = None +user = None +password = None +do_ip = False +do_scan = True +x = 1 +while x < len(sys.argv): + if sys.argv[x] == "--private": + dump_private = True + elif sys.argv[x] == "--connect": + connect = True + elif (sys.argv[x] == "--user" or sys.argv[x] == "--username"): + x += 1 + user = sys.argv[x] + elif sys.argv[x] == "--apn": + x += 1 + apn = sys.argv[x] + elif sys.argv[x] == "--password": + x += 1 + password = sys.argv[x] + elif sys.argv[x] == "--ip": + do_ip = True + if os.geteuid() != 0: + print "You probably want to be root to use --ip" + sys.exit(1) + elif sys.argv[x] == "--no-scan": + do_scan = False + x += 1 + +bus = dbus.SystemBus() + +# Get available modems: +manager_proxy = bus.get_object('org.freedesktop.ModemManager', '/org/freedesktop/ModemManager') +manager_iface = dbus.Interface(manager_proxy, dbus_interface='org.freedesktop.ModemManager') +modems = manager_iface.EnumerateDevices() + +if not modems: + print "No modems found" + sys.exit(1) + +for m in modems: + connect_success = False + data_device = None + + proxy = bus.get_object(MM_DBUS_SERVICE, m) + + # Properties + props_iface = dbus.Interface(proxy, dbus_interface='org.freedesktop.DBus.Properties') + + type = props_iface.Get(MM_DBUS_INTERFACE_MODEM, 'Type') + if type == 1: + print "GSM modem" + elif type == 2: + print "CDMA modem" + else: + print "Invalid modem type: %d" % type + + print "Driver: '%s'" % (props_iface.Get(MM_DBUS_INTERFACE_MODEM, 'Driver')) + print "Modem device: '%s'" % (props_iface.Get(MM_DBUS_INTERFACE_MODEM, 'MasterDevice')) + data_device = props_iface.Get(MM_DBUS_INTERFACE_MODEM, 'Device') + print "Data device: '%s'" % data_device + + # Modem interface + modem = dbus.Interface(proxy, dbus_interface=MM_DBUS_INTERFACE_MODEM) + + try: + modem.Enable(True) + except dbus.exceptions.DBusException, e: + print "Error enabling modem: %s" % e + sys.exit(1) + + info = modem.GetInfo() + print "Vendor: %s" % info[0] + print "Model: %s" % info[1] + print "Version: %s" % info[2] + + if type == 1: + gsm_inspect(proxy, dump_private, do_scan) + if connect == True: + connect_success = gsm_connect(proxy, apn, user, password) + elif type == 2: + cdma_inspect(proxy, dump_private) + if connect == True: + connect_success = cdma_connect(proxy, user, password) + print + + if connect_success and do_ip: + tmpfile = "/tmp/mm-test-%d.tmp" % os.getpid() + success = False + try: + ip_method = props_iface.Get(MM_DBUS_INTERFACE_MODEM, 'IpMethod') + if ip_method == 0: + # ppp + p = ppp_start(data_device, user, password, tmpfile) + if ppp_wait(p, tmpfile): + data_device = file_configure_iface(tmpfile) + success = True + elif ip_method == 1: + # static + static_start(data_device, modem) + success = True + elif ip_method == 2: + # dhcp + pass + except Exception, e: + print "Error setting up IP: %s" % e + + if success: + try_ping(data_device) + print "Waiting for 30s..." + time.sleep(30) + + print "Disconnecting..." + try: + if ip_method == 0: + ppp_stop(p) + try: + os.remove(tmpfile) + except: + pass + elif ip_method == 1: + # static + down_iface(data_device) + elif ip_method == 2: + # dhcp + down_iface(data_device) + + modem.Disconnect() + except Exception, e: + print "Error tearing down IP: %s" % e + + time.sleep(5) + + modem.Enable(False) + |