Implement Eclipse Mqtt Android Client Using Single Connection Instance
I am using Eclipse Paho android mqtt service in my app. I am able to subscribe and publish the messages to mqtt broker. I have couple of Activities in the app, when any activity is
Solution 1:
A 'better' way would be to create a Service
which connects/reconnects to the MQTT Broker.
I created my own service called MqttConnectionManagerService
which maintains and manages connection to the broker.
Key features of this solution:
- Service maintains a single instance as long as it is alive.
- If service is killed, Android restarts it (because
START_STICKY
) - Service can be started when device boots.
- Service runs in the background and is always connected to receive notifications.
- If the service is alive, calling
startService(..)
again would trigger itsonStartCommand()
method (and notonCreate()
). In this method, we simply check if this client is connected to the broker and connect/reconnect if required.
Sample code:
MqttConnectionManagerService
publicclassMqttConnectionManagerServiceextendsService {
privateMqttAndroidClient client;
privateMqttConnectOptions options;
@OverridepublicvoidonCreate() {
super.onCreate();
options = createMqttConnectOptions();
client = createMqttAndroidClient();
}
@Overridepublic int onStartCommand(Intent intent, int flags, int startId) {
this.connect(client, options);
returnSTART_STICKY;
}
privateMqttConnectOptionscreateMqttConnectOptions() {
//create and return options
}
privateMqttAndroidClientcreateMqttAndroidClient() {
//create and return client
}
publicvoidconnect(final MqttAndroidClient client, MqttConnectOptions options) {
try {
if (!client.isConnected()) {
IMqttToken token = client.connect(options);
//on successful connection, publish or subscribe as usual
token.setActionCallback(newIMqttActionListener() {..});
client.setCallback(newMqttCallback() {..});
}
} catch (MqttException e) {
//handle e
}
}
}
AndroidManifest.xml
<?xml version="1.0" encoding="utf-8"?><manifestxmlns:android="http://schemas.android.com/apk/res/android"package="..."><!-- Permissions required to receive BOOT_COMPLETED event --><uses-permissionandroid:name="android.permission.RECEIVE_BOOT_COMPLETED" /><applicationandroid:allowBackup="true"android:icon="@mipmap/ic_launcher"android:label="@string/app_name"android:supportsRtl="true"android:theme="@style/AppTheme"><!-- activities go here --><!-- BroadcastReceiver that starts MqttConnectionManagerService on device boot --><receiverandroid:name=".MqttServiceStartReceiver"><intent-filter><actionandroid:name="android.intent.action.BOOT_COMPLETED" /></intent-filter></receiver><!-- Services required for using MQTT --><serviceandroid:name="org.eclipse.paho.android.service.MqttService" /><serviceandroid:name=".MqttConnectionManagerService" /></application></manifest>
MqttServiceStartReceiver
publicclassMqttServiceStartReceiverextendsBroadcastReceiver {
@OverridepublicvoidonReceive(Context context, Intent intent) {
context.startService(newIntent(context, MqttConnectionManagerService.class));
}
}
In your Activity's
onResume()
startService(newIntent(this, MqttConnectionManagerService.class));
Solution 2:
Here is my Singleton implementation of MQTT Client:
publicclassMQTTConnectionextendsServerConnectionImpl {
privatestaticStringTAG = MQTTConnection.class.getSimpleName();
privatestaticContext mContext;
privatestaticMqttAndroidClient mqttAndroidClient;
privatestaticString clientId;
privatestaticMQTTConnection sMqttConnection = null;
privateMQTTConnection() {
}
publicstaticMQTTConnectiongetInstance(Context context) {
if (null == sMqttConnection) {
mContext = context;
init();
}
return sMqttConnection;
}
publicstaticvoidreconnectToBroker() {
try {
if (sMqttConnection != null) {
sMqttConnection.disconnect();
}
init();
} catch (Exception e) {
e.printStackTrace();
}
}
privatestaticvoidinit() {
sMqttConnection = newMQTTConnection();
setClientId();
connectToBroker();
}
privatestaticvoidconnectToBroker() {
String ip = STBPreferences.getInstance(mContext).getString(Constants.KEY_MQTT_SERVER_IP, null);
if (ip == null) {
ip = Constants.MQTT_SERVER_IP;
}
final String uri = Constants.MQTT_URI_PREFIX + ip + ":" + Constants.MQTT_SERVER_PORT;
mqttAndroidClient = newMqttAndroidClient(mContext.getApplicationContext(), uri, clientId);
mqttAndroidClient.setCallback(newMqttCallbackExtended() {
@OverridepublicvoidconnectComplete(boolean reconnect, String serverURI) {
if (reconnect) {
LogUtil.d(TAG, "Reconnected to : " + serverURI);
// Because Clean Session is true, we need to re-subscribesubscribeToTopic();
} else {
LogUtil.d(TAG, "Connected to: " + serverURI);
}
}
@OverridepublicvoidconnectionLost(Throwable cause) {
LogUtil.d(TAG, "The Connection was lost.");
}
@OverridepublicvoidmessageArrived(String topic, MqttMessage mqttMessage) throws Exception {
String messageReceived = newString(mqttMessage.getPayload());
LogUtil.d(TAG, "Incoming message: " + messageReceived);
try {
Gson gson = newGson();
Message message = gson.fromJson(messageReceived, Message.class);
// Here you can send message to listeners for processing
} catch (JsonSyntaxException e) {
// Something wrong with message format json
e.printStackTrace();
}
}
@OverridepublicvoiddeliveryComplete(IMqttDeliveryToken token) {
LogUtil.d(TAG, "Message delivered");
}
});
MqttConnectOptions mqttConnectOptions = newMqttConnectOptions();
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setCleanSession(false);
try {
mqttAndroidClient.connect(mqttConnectOptions, null, newIMqttActionListener() {
@OverridepublicvoidonSuccess(IMqttToken asyncActionToken) {
LogUtil.d(TAG, "connect onSuccess");
DisconnectedBufferOptions disconnectedBufferOptions = newDisconnectedBufferOptions();
disconnectedBufferOptions.setBufferEnabled(true);
disconnectedBufferOptions.setBufferSize(100);
disconnectedBufferOptions.setPersistBuffer(false);
disconnectedBufferOptions.setDeleteOldestMessages(false);
mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
subscribeToTopic();
}
@OverridepublicvoidonFailure(IMqttToken asyncActionToken, Throwable exception) {
LogUtil.d(TAG, "Failed to connect to: " + uri);
}
});
} catch (MqttException ex){
ex.printStackTrace();
}
}
publicvoidpublish(Message publishMessage) {
try {
Gson gson = newGson();
String replyJson = gson.toJson(publishMessage);
String publishTopic = clientId + Constants.MQTT_PUB_TOPIC_APPEND;
MqttMessage message = newMqttMessage();
message.setPayload(replyJson.getBytes());
mqttAndroidClient.publish(publishTopic, message);
LogUtil.d(TAG, "Message Published");
/*if(!mqttAndroidClient.isConnected()){
LogUtil.d(TAG, mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
}*/
} catch (MqttException e) {
LogUtil.d(TAG, "Error Publishing: " + e.getMessage());
e.printStackTrace();
} catch (NullPointerException e) {
e.printStackTrace();
if (mqttAndroidClient == null) {
init();
}
}
}
privatestaticvoidsubscribeToTopic() {
try {
String subscriptionTopic = clientId + Constants.MQTT_SUB_TOPIC_APPEND;
mqttAndroidClient.subscribe(subscriptionTopic, 0, null, newIMqttActionListener() {
@OverridepublicvoidonSuccess(IMqttToken asyncActionToken) {
LogUtil.d(TAG, "subscribe onSuccess");
}
@OverridepublicvoidonFailure(IMqttToken asyncActionToken, Throwable exception) {
LogUtil.d(TAG, "Failed to subscribe");
}
});
} catch (MqttException ex){
System.err.println("Exception whilst subscribing");
ex.printStackTrace();
}
}
publicvoidunSubscribe() {
LogUtil.d(TAG, "unSubscribe");
final String topic = "foo/bar";
try {
IMqttToken unsubToken = mqttAndroidClient.unsubscribe(topic);
unsubToken.setActionCallback(newIMqttActionListener() {
@OverridepublicvoidonSuccess(IMqttToken asyncActionToken) {
// The subscription could successfully be removed from the clientLogUtil.d(TAG, "unSubscribe onSuccess");
}
@OverridepublicvoidonFailure(IMqttToken asyncActionToken,
Throwable exception) {
LogUtil.d(TAG, "unSubscribe onFailure");
// some error occurred, this is very unlikely as even if the client// did not had a subscription to the topic the unsubscribe action// will be successfully
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
publicvoiddisconnect() {
LogUtil.d(TAG, "disconnect");
try {
IMqttToken disconToken = mqttAndroidClient.disconnect();
disconToken.setActionCallback(newIMqttActionListener() {
@OverridepublicvoidonSuccess(IMqttToken asyncActionToken) {
// we are now successfully disconnectedLogUtil.d(TAG, "disconnect onSuccess");
}
@OverridepublicvoidonFailure(IMqttToken asyncActionToken,
Throwable exception) {
LogUtil.d(TAG, "disconnect onFailure");
// something went wrong, but probably we are disconnected anyway
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
privatestaticvoidsetClientId() {
String srNo = STBPreferences.getInstance(mContext).getString(Constants.STB_SERIAL_NO, null);
clientId = srNo;
}
privateStringgetClientId() {
if (clientId == null) {
setClientId();
}
return clientId;
}
@OverridepublicbooleanisInternetEnabled() {
returnNetworkUtility.isNetworkAvailable(mContext);
}
@OverridepublicvoidsendMessage(Message message) {
publish(message);
}
@Overridepublicvoidreconnect() {
reconnectToBroker();
}
}
Here is the message Model. Change Model class for your need.
publicclassMessage {
/**
* Type of data
*/@SerializedName("type")
privateStringtype;
/**
* Name of component
*/@SerializedName("name")
privateString name;
/**
* Data in text format
*/@Expose@SerializedName("data")
privateObject data;
publicMessage(Stringtype, String name, Object data) {
this.type = type;
this.name = name;
this.data = data;
}
publicStringgetType() {
returntype;
}
publicvoidsetType(Stringtype) {
this.type = type;
}
publicStringgetName() {
return name;
}
publicvoidsetName(String name) {
this.name = name;
}
publicObjectgetData() {
return data;
}
publicvoidsetData(Object data) {
this.data = data;
}
@OverridepublicStringtoString() {
return"Message{" +
"type=" + type + "\n" +
"name=" + name + "\n" +
"data=" + data.toString() +
'}';
}
}
Get MQTT instance in your activity
MQTTConnectionmqttConnection= HTTPConnection.getInstance(mContext);
Publish Message
mqttConnectin.sendMessage(newMessage( ... ));
EDIT 1: Here is my ServerConnectionImpl class for your reference.
publicclassServerConnectionImplextendsConfigurationChangeListenerImplimplementsServerConnection {
/**
* Logging TAG
*/privatestatic final StringTAG = ServerConnectionImpl.class.getSimpleName();
/**
* List of all listener which are registered for messages received
*/privatestaticArrayList<ConfigurationChangeListenerImpl> sConfigurationChangeListeners = newArrayList<>();
@OverridepublicbooleanisInternetEnabled() {
returnfalse;
}
@OverridepublicResponseDatagetSubscriptionDetails(String serialNumber) {
returnnull;
}
@OverridepublicvoidsendMessage(Message message, WebSocket webSocket) {
}
@OverridepublicvoidsendMessage(Message message) {
}
@OverridepublicvoidsendMessageToAll(Message message) {
}
//@OverridepublicstaticvoidnotifyListeners(int config, Message message, WebSocket wc) {
switch (config) {
caseConfigs.CAMERA: {
for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) {
l.onCameraServerChanged();
}
break;
}
caseConfigs.GESTURE: {
for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) {
l.onGestureCommandServerChanged();
}
break;
}
caseConfigs.MOTION_SENSOR: {
for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) {
l.onMotionSensorServerChanged();
}
break;
}
caseConfigs.MESSAGE: {
for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) {
l.onMessageReceived(message, wc);
}
break;
}
}
}
/**
* Adds listener to listen to messages.
*
* @paramlistener
*/@Overridepublic synchronized voidaddListener(ConfigurationChangeListenerImpl listener) {
LogUtil.d(TAG, "addListener()");
if (listener == null) {
thrownewIllegalArgumentException("Invalid listener " + listener);
}
sConfigurationChangeListeners.add(listener);
}
/**
* Removes the listener
*
* @paramlistener
*/@Overridepublic synchronized voidremoveListener(ConfigurationChangeListenerImpl listener) {
LogUtil.d(TAG, "removeListener()");
if (listener == null) {
thrownewIllegalArgumentException("Invalid listener " + listener);
}
sConfigurationChangeListeners.remove(listener);
}
@OverridepublicvoidupdateState() {
}
@Overridepublicvoidreconnect() {
}
}
You can use your own implementation for ServerConnectionImpl class.
Post a Comment for "Implement Eclipse Mqtt Android Client Using Single Connection Instance"