package com.swallowframe.core.pc.mqtt.protocol;

import com.swallowframe.core.pc.mqtt.model.DupPublishMessageStore;
import com.swallowframe.core.pc.mqtt.model.InternalMessage;
import com.swallowframe.core.pc.mqtt.model.RetainMessageStore;
import com.swallowframe.core.pc.mqtt.model.SubscribeStore;
import com.swallowframe.core.pc.mqtt.service.DupPublishMessageStoreService;
import com.swallowframe.core.pc.mqtt.service.InternalMessageService;
import com.swallowframe.core.pc.mqtt.service.MessageIdService;
import com.swallowframe.core.pc.mqtt.service.RetainMessageStoreService;
import com.swallowframe.core.pc.mqtt.service.SessionStoreService;
import com.swallowframe.core.pc.mqtt.service.SubscribeStoreService;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/swallowframe/core/pc/mqtt/protocol/PublishHandler.class */
public class PublishHandler implements IProtocolHandler {
    private static final Logger log = LoggerFactory.getLogger(PublishHandler.class);
    private SessionStoreService sessionStoreService;
    private SubscribeStoreService subscribeStoreService;
    private MessageIdService messageIdService;
    private RetainMessageStoreService retainMessageStoreService;
    private DupPublishMessageStoreService dupPublishMessageStoreService;
    private InternalMessageService internalMessageService;

    public PublishHandler(SessionStoreService sessionStoreService, SubscribeStoreService subscribeStoreService, MessageIdService messageIdService, RetainMessageStoreService retainMessageStoreService, DupPublishMessageStoreService dupPublishMessageStoreService, InternalMessageService internalMessageService) {
        this.sessionStoreService = sessionStoreService;
        this.subscribeStoreService = subscribeStoreService;
        this.messageIdService = messageIdService;
        this.retainMessageStoreService = retainMessageStoreService;
        this.dupPublishMessageStoreService = dupPublishMessageStoreService;
        this.internalMessageService = internalMessageService;
    }

    @Override // com.swallowframe.core.pc.mqtt.protocol.IProtocolHandler
    public void handle(Channel channel, MqttMessage mqttMessage) {
        MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
        String str = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
        if (mqttPublishMessage.fixedHeader().qosLevel() == MqttQoS.AT_MOST_ONCE) {
            byte[] bArr = new byte[mqttPublishMessage.payload().readableBytes()];
            mqttPublishMessage.payload().getBytes(mqttPublishMessage.payload().readerIndex(), bArr);
            this.internalMessageService.send(new InternalMessage().setTopic(mqttPublishMessage.variableHeader().topicName()).setMqttQoS(mqttPublishMessage.fixedHeader().qosLevel().value()).setMessageBytes(bArr).setDup(false).setRetain(false).setClientId(str));
            sendPublishMessage(mqttPublishMessage.variableHeader().topicName(), mqttPublishMessage.fixedHeader().qosLevel(), bArr, false, false);
        }
        if (mqttPublishMessage.fixedHeader().qosLevel() == MqttQoS.AT_LEAST_ONCE) {
            byte[] bArr2 = new byte[mqttPublishMessage.payload().readableBytes()];
            mqttPublishMessage.payload().getBytes(mqttPublishMessage.payload().readerIndex(), bArr2);
            this.internalMessageService.send(new InternalMessage().setTopic(mqttPublishMessage.variableHeader().topicName()).setMqttQoS(mqttPublishMessage.fixedHeader().qosLevel().value()).setMessageBytes(bArr2).setDup(false).setRetain(false).setClientId(str));
            sendPublishMessage(mqttPublishMessage.variableHeader().topicName(), mqttPublishMessage.fixedHeader().qosLevel(), bArr2, false, false);
            sendPubAckMessage(channel, mqttPublishMessage.variableHeader().packetId());
        }
        if (mqttPublishMessage.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {
            byte[] bArr3 = new byte[mqttPublishMessage.payload().readableBytes()];
            mqttPublishMessage.payload().getBytes(mqttPublishMessage.payload().readerIndex(), bArr3);
            this.internalMessageService.send(new InternalMessage().setTopic(mqttPublishMessage.variableHeader().topicName()).setMqttQoS(mqttPublishMessage.fixedHeader().qosLevel().value()).setMessageBytes(bArr3).setDup(false).setRetain(false).setClientId(str));
            sendPublishMessage(mqttPublishMessage.variableHeader().topicName(), mqttPublishMessage.fixedHeader().qosLevel(), bArr3, false, false);
            sendPubRecMessage(channel, mqttPublishMessage.variableHeader().packetId());
        }
        if (mqttPublishMessage.fixedHeader().isRetain()) {
            byte[] bArr4 = new byte[mqttPublishMessage.payload().readableBytes()];
            mqttPublishMessage.payload().getBytes(mqttPublishMessage.payload().readerIndex(), bArr4);
            if (bArr4.length == 0) {
                this.retainMessageStoreService.remove(mqttPublishMessage.variableHeader().topicName());
            } else {
                this.retainMessageStoreService.put(mqttPublishMessage.variableHeader().topicName(), new RetainMessageStore().setTopic(mqttPublishMessage.variableHeader().topicName()).setMqttQoS(mqttPublishMessage.fixedHeader().qosLevel().value()).setMessageBytes(bArr4));
            }
        }
    }

    private void sendPublishMessage(String str, MqttQoS mqttQoS, byte[] bArr, boolean z, boolean z2) {
        for (SubscribeStore subscribeStore : this.subscribeStoreService.search(str)) {
            if (this.sessionStoreService.containsKey(subscribeStore.getClientId())) {
                MqttQoS valueOf = mqttQoS.value() > subscribeStore.getMqttQoS() ? MqttQoS.valueOf(subscribeStore.getMqttQoS()) : mqttQoS;
                if (valueOf == MqttQoS.AT_MOST_ONCE) {
                    MqttPublishMessage newMessage = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, z2, valueOf, z, 0), new MqttPublishVariableHeader(str, 0), Unpooled.buffer().writeBytes(bArr));
                    log.info("PUBLISH - clientId: {}, topic: {}, Qos: {}", new Object[]{subscribeStore.getClientId(), str, Integer.valueOf(valueOf.value())});
                    this.sessionStoreService.get(subscribeStore.getClientId()).getChannel().writeAndFlush(newMessage);
                }
                if (valueOf == MqttQoS.AT_LEAST_ONCE) {
                    int nextMessageId = this.messageIdService.getNextMessageId();
                    MqttPublishMessage newMessage2 = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, z2, valueOf, z, 0), new MqttPublishVariableHeader(str, nextMessageId), Unpooled.buffer().writeBytes(bArr));
                    log.info("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", new Object[]{subscribeStore.getClientId(), str, Integer.valueOf(valueOf.value()), Integer.valueOf(nextMessageId)});
                    this.dupPublishMessageStoreService.put(subscribeStore.getClientId(), new DupPublishMessageStore().setClientId(subscribeStore.getClientId()).setTopic(str).setMqttQoS(valueOf.value()).setMessageBytes(bArr).setMessageId(nextMessageId));
                    this.sessionStoreService.get(subscribeStore.getClientId()).getChannel().writeAndFlush(newMessage2);
                }
                if (valueOf == MqttQoS.EXACTLY_ONCE) {
                    int nextMessageId2 = this.messageIdService.getNextMessageId() + 1;
                    MqttPublishMessage newMessage3 = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, z2, valueOf, z, 0), new MqttPublishVariableHeader(str, nextMessageId2), Unpooled.buffer().writeBytes(bArr));
                    log.info("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", new Object[]{subscribeStore.getClientId(), str, Integer.valueOf(valueOf.value()), Integer.valueOf(nextMessageId2)});
                    this.dupPublishMessageStoreService.put(subscribeStore.getClientId(), new DupPublishMessageStore().setClientId(subscribeStore.getClientId()).setTopic(str).setMqttQoS(valueOf.value()).setMessageBytes(bArr).setMessageId(nextMessageId2));
                    this.sessionStoreService.get(subscribeStore.getClientId()).getChannel().writeAndFlush(newMessage3);
                }
            }
        }
    }

    private void sendPubAckMessage(Channel channel, int i) {
        channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(i), (Object) null));
    }

    private void sendPubRecMessage(Channel channel, int i) {
        channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(i), (Object) null));
    }
}
