Skip to content Skip to sidebar Skip to footer

Implement Eclipse Mqtt Android Client Using Single Connection Instance

I am using Eclipse Paho android mqtt service in my app. I am able to subscribe and publish the messages to mqtt broker. I have couple of Activities in the app, when any activity is

Solution 1:

A 'better' way would be to create a Service which connects/reconnects to the MQTT Broker.

I created my own service called MqttConnectionManagerService which maintains and manages connection to the broker.

Key features of this solution:

  1. Service maintains a single instance as long as it is alive.
  2. If service is killed, Android restarts it (because START_STICKY)
  3. Service can be started when device boots.
  4. Service runs in the background and is always connected to receive notifications.
  5. If the service is alive, calling startService(..) again would trigger its onStartCommand() method (and not onCreate()). In this method, we simply check if this client is connected to the broker and connect/reconnect if required.

Sample code:

MqttConnectionManagerService

publicclassMqttConnectionManagerServiceextendsService {

    privateMqttAndroidClient client;
    privateMqttConnectOptions options;

    @OverridepublicvoidonCreate() {
        super.onCreate();
        options = createMqttConnectOptions();
        client = createMqttAndroidClient();
    }


    @Overridepublic int onStartCommand(Intent intent, int flags, int startId) {
        this.connect(client, options);
        returnSTART_STICKY;
    }

    privateMqttConnectOptionscreateMqttConnectOptions() {
        //create and return options
    }

    privateMqttAndroidClientcreateMqttAndroidClient() {
        //create and return client
    }

    publicvoidconnect(final MqttAndroidClient client, MqttConnectOptions options) {

        try {
            if (!client.isConnected()) {
                IMqttToken token = client.connect(options);
                //on successful connection, publish or subscribe as usual
                token.setActionCallback(newIMqttActionListener() {..});
                client.setCallback(newMqttCallback() {..});
            }
        } catch (MqttException e) {
            //handle e
        }
    }

}

AndroidManifest.xml

<?xml version="1.0" encoding="utf-8"?><manifestxmlns:android="http://schemas.android.com/apk/res/android"package="..."><!-- Permissions required to receive BOOT_COMPLETED event --><uses-permissionandroid:name="android.permission.RECEIVE_BOOT_COMPLETED" /><applicationandroid:allowBackup="true"android:icon="@mipmap/ic_launcher"android:label="@string/app_name"android:supportsRtl="true"android:theme="@style/AppTheme"><!-- activities go here --><!-- BroadcastReceiver that starts MqttConnectionManagerService on device boot --><receiverandroid:name=".MqttServiceStartReceiver"><intent-filter><actionandroid:name="android.intent.action.BOOT_COMPLETED" /></intent-filter></receiver><!-- Services required for using MQTT --><serviceandroid:name="org.eclipse.paho.android.service.MqttService" /><serviceandroid:name=".MqttConnectionManagerService" /></application></manifest>

MqttServiceStartReceiver

publicclassMqttServiceStartReceiverextendsBroadcastReceiver {    
    @OverridepublicvoidonReceive(Context context, Intent intent) {
        context.startService(newIntent(context, MqttConnectionManagerService.class));
    }
}

In your Activity's onResume()

startService(newIntent(this, MqttConnectionManagerService.class));

Solution 2:

Here is my Singleton implementation of MQTT Client:

publicclassMQTTConnectionextendsServerConnectionImpl { 
    privatestaticStringTAG = MQTTConnection.class.getSimpleName();
    privatestaticContext mContext;
    privatestaticMqttAndroidClient mqttAndroidClient;
    privatestaticString clientId;
    privatestaticMQTTConnection sMqttConnection = null;
    privateMQTTConnection() {

    }

    publicstaticMQTTConnectiongetInstance(Context context) {
        if (null == sMqttConnection) {
            mContext = context;
            init();
        }
        return sMqttConnection;
    }

    publicstaticvoidreconnectToBroker() {
        try {
            if (sMqttConnection != null) {
                sMqttConnection.disconnect();
            }

            init();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    privatestaticvoidinit() {
        sMqttConnection = newMQTTConnection();
        setClientId();
        connectToBroker();
    }

    privatestaticvoidconnectToBroker() {
        String ip = STBPreferences.getInstance(mContext).getString(Constants.KEY_MQTT_SERVER_IP, null);
        if (ip == null) {
            ip = Constants.MQTT_SERVER_IP;
        }
        final String uri = Constants.MQTT_URI_PREFIX + ip + ":" + Constants.MQTT_SERVER_PORT;
        mqttAndroidClient = newMqttAndroidClient(mContext.getApplicationContext(), uri, clientId);
        mqttAndroidClient.setCallback(newMqttCallbackExtended() {
            @OverridepublicvoidconnectComplete(boolean reconnect, String serverURI) {

                if (reconnect) {
                    LogUtil.d(TAG, "Reconnected to : " + serverURI);
                    // Because Clean Session is true, we need to re-subscribesubscribeToTopic();
                } else {
                    LogUtil.d(TAG, "Connected to: " + serverURI);
                }
            }

            @OverridepublicvoidconnectionLost(Throwable cause) {
                LogUtil.d(TAG, "The Connection was lost.");
            }

            @OverridepublicvoidmessageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                String messageReceived = newString(mqttMessage.getPayload());
                LogUtil.d(TAG, "Incoming message: " + messageReceived);
                try {
                    Gson gson = newGson();
                    Message message = gson.fromJson(messageReceived, Message.class);
                    // Here you can send message to listeners for processing

                } catch (JsonSyntaxException e) {
                    // Something wrong with message format json
                    e.printStackTrace();
                }
            }

            @OverridepublicvoiddeliveryComplete(IMqttDeliveryToken token) {
                LogUtil.d(TAG, "Message delivered");

            }
        });

        MqttConnectOptions mqttConnectOptions = newMqttConnectOptions();
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setCleanSession(false);

        try {
            mqttAndroidClient.connect(mqttConnectOptions, null, newIMqttActionListener() {
                @OverridepublicvoidonSuccess(IMqttToken asyncActionToken) {
                    LogUtil.d(TAG, "connect onSuccess");
                    DisconnectedBufferOptions disconnectedBufferOptions = newDisconnectedBufferOptions();
                    disconnectedBufferOptions.setBufferEnabled(true);
                    disconnectedBufferOptions.setBufferSize(100);
                    disconnectedBufferOptions.setPersistBuffer(false);
                    disconnectedBufferOptions.setDeleteOldestMessages(false);
                    mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
                    subscribeToTopic();
                }

                @OverridepublicvoidonFailure(IMqttToken asyncActionToken, Throwable exception) {
                    LogUtil.d(TAG, "Failed to connect to: " + uri);
                }
            });


        } catch (MqttException ex){
            ex.printStackTrace();
        }
    }

    publicvoidpublish(Message publishMessage) {

        try {
            Gson gson = newGson();
            String replyJson = gson.toJson(publishMessage);

            String publishTopic = clientId + Constants.MQTT_PUB_TOPIC_APPEND;
            MqttMessage message = newMqttMessage();
            message.setPayload(replyJson.getBytes());
            mqttAndroidClient.publish(publishTopic, message);
            LogUtil.d(TAG, "Message Published");
            /*if(!mqttAndroidClient.isConnected()){
                LogUtil.d(TAG, mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
            }*/
        } catch (MqttException e) {
            LogUtil.d(TAG, "Error Publishing: " + e.getMessage());
            e.printStackTrace();
        } catch (NullPointerException e) {
            e.printStackTrace();
            if (mqttAndroidClient == null) {
                init();
            }
        }
    }

    privatestaticvoidsubscribeToTopic() {

        try {
            String subscriptionTopic = clientId + Constants.MQTT_SUB_TOPIC_APPEND;
            mqttAndroidClient.subscribe(subscriptionTopic, 0, null, newIMqttActionListener() {
                @OverridepublicvoidonSuccess(IMqttToken asyncActionToken) {
                    LogUtil.d(TAG, "subscribe onSuccess");
                }

                @OverridepublicvoidonFailure(IMqttToken asyncActionToken, Throwable exception) {
                    LogUtil.d(TAG, "Failed to subscribe");
                }
            });

        } catch (MqttException ex){
            System.err.println("Exception whilst subscribing");
            ex.printStackTrace();
        }
    }

    publicvoidunSubscribe() {
        LogUtil.d(TAG, "unSubscribe");
        final String topic = "foo/bar";
        try {
            IMqttToken unsubToken = mqttAndroidClient.unsubscribe(topic);
            unsubToken.setActionCallback(newIMqttActionListener() {
                @OverridepublicvoidonSuccess(IMqttToken asyncActionToken) {
                    // The subscription could successfully be removed from the clientLogUtil.d(TAG, "unSubscribe onSuccess");
                }

                @OverridepublicvoidonFailure(IMqttToken asyncActionToken,
                                      Throwable exception) {
                    LogUtil.d(TAG, "unSubscribe onFailure");
                    // some error occurred, this is very unlikely as even if the client// did not had a subscription to the topic the unsubscribe action// will be successfully
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    publicvoiddisconnect() {
        LogUtil.d(TAG, "disconnect");
        try {
            IMqttToken disconToken = mqttAndroidClient.disconnect();
            disconToken.setActionCallback(newIMqttActionListener() {
                @OverridepublicvoidonSuccess(IMqttToken asyncActionToken) {
                    // we are now successfully disconnectedLogUtil.d(TAG, "disconnect onSuccess");
                }

                @OverridepublicvoidonFailure(IMqttToken asyncActionToken,
                                      Throwable exception) {
                    LogUtil.d(TAG, "disconnect onFailure");
                    // something went wrong, but probably we are disconnected anyway
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    privatestaticvoidsetClientId() {
        String srNo = STBPreferences.getInstance(mContext).getString(Constants.STB_SERIAL_NO, null);
        clientId = srNo;
    }

    privateStringgetClientId() {
        if (clientId == null) {
            setClientId();
        }
        return clientId;
    }

    @OverridepublicbooleanisInternetEnabled() {
        returnNetworkUtility.isNetworkAvailable(mContext);
    }

    @OverridepublicvoidsendMessage(Message message) {
        publish(message);
    }

    @Overridepublicvoidreconnect() {
        reconnectToBroker();
    }
}

Here is the message Model. Change Model class for your need.

publicclassMessage {

    /**
     * Type of data
     */@SerializedName("type")
    privateStringtype;
    /**
     * Name of component
     */@SerializedName("name")
    privateString name;
    /**
     * Data in text format
     */@Expose@SerializedName("data")
    privateObject data;

    publicMessage(Stringtype, String name, Object data) {
        this.type = type;
        this.name = name;
        this.data = data;
    }

    publicStringgetType() {
        returntype;
    }

    publicvoidsetType(Stringtype) {
        this.type = type;
    }

    publicStringgetName() {
        return name;
    }

    publicvoidsetName(String name) {
        this.name = name;
    }

    publicObjectgetData() {
        return data;
    }

    publicvoidsetData(Object data) {
        this.data = data;
    }

    @OverridepublicStringtoString() {
        return"Message{" +
                "type=" + type + "\n" +
                "name=" + name + "\n" +
                "data=" + data.toString() +
                '}';
    }
}

Get MQTT instance in your activity

MQTTConnectionmqttConnection= HTTPConnection.getInstance(mContext);

Publish Message

mqttConnectin.sendMessage(newMessage( ... ));

EDIT 1: Here is my ServerConnectionImpl class for your reference.

publicclassServerConnectionImplextendsConfigurationChangeListenerImplimplementsServerConnection {

/**
 * Logging TAG
 */privatestatic final StringTAG = ServerConnectionImpl.class.getSimpleName();
/**
 * List of all listener which are registered for messages received
 */privatestaticArrayList<ConfigurationChangeListenerImpl> sConfigurationChangeListeners = newArrayList<>();

@OverridepublicbooleanisInternetEnabled() {
    returnfalse;
}

@OverridepublicResponseDatagetSubscriptionDetails(String serialNumber) {
    returnnull;
}

@OverridepublicvoidsendMessage(Message message, WebSocket webSocket) {

}

@OverridepublicvoidsendMessage(Message message) {

}

@OverridepublicvoidsendMessageToAll(Message message) {

}

//@OverridepublicstaticvoidnotifyListeners(int config, Message message, WebSocket wc) {
    switch (config) {
        caseConfigs.CAMERA: {
            for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) {
                l.onCameraServerChanged();
            }
            break;
        }
        caseConfigs.GESTURE: {
            for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) {
                l.onGestureCommandServerChanged();
            }
            break;
        }
        caseConfigs.MOTION_SENSOR: {
            for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) {
                l.onMotionSensorServerChanged();
            }
            break;
        }
        caseConfigs.MESSAGE: {
            for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) {
                l.onMessageReceived(message, wc);
            }
            break;
        }
    }
}
/**
 * Adds listener to listen to messages.
 *
 * @paramlistener
 */@Overridepublic synchronized voidaddListener(ConfigurationChangeListenerImpl listener) {
    LogUtil.d(TAG, "addListener()");
    if (listener == null) {
        thrownewIllegalArgumentException("Invalid listener " + listener);
    }
    sConfigurationChangeListeners.add(listener);
}

/**
 * Removes the listener
 *
 * @paramlistener
 */@Overridepublic synchronized voidremoveListener(ConfigurationChangeListenerImpl listener) {
    LogUtil.d(TAG, "removeListener()");
    if (listener == null) {
        thrownewIllegalArgumentException("Invalid listener " + listener);
    }
    sConfigurationChangeListeners.remove(listener);
}

@OverridepublicvoidupdateState() {

}

@Overridepublicvoidreconnect() {

}

}

You can use your own implementation for ServerConnectionImpl class.

Post a Comment for "Implement Eclipse Mqtt Android Client Using Single Connection Instance"