package com.swallowframe.core.pc.mqtt.protocol;

import cn.hutool.core.util.StrUtil;
import com.swallowframe.core.pc.mqtt.model.DupPubRelMessageStore;
import com.swallowframe.core.pc.mqtt.model.DupPublishMessageStore;
import com.swallowframe.core.pc.mqtt.model.SessionStore;
import com.swallowframe.core.pc.mqtt.service.AuthService;
import com.swallowframe.core.pc.mqtt.service.DupPubRelMessageStoreService;
import com.swallowframe.core.pc.mqtt.service.DupPublishMessageStoreService;
import com.swallowframe.core.pc.mqtt.service.SessionStoreService;
import com.swallowframe.core.pc.mqtt.service.SubscribeStoreService;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttIdentifierRejectedException;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttUnacceptableProtocolVersionException;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/swallowframe/core/pc/mqtt/protocol/ConnectHandler.class */
public class ConnectHandler implements IProtocolHandler {
    private static final Logger log = LoggerFactory.getLogger(ConnectHandler.class);
    private final AuthService authService;
    private final SessionStoreService sessionStoreService;
    private final DupPublishMessageStoreService dupPublishMessageStoreService;
    private final DupPubRelMessageStoreService dupPubRelMessageStoreService;
    private final SubscribeStoreService subscribeStoreService;

    public ConnectHandler(AuthService authService, SessionStoreService sessionStoreService, DupPublishMessageStoreService dupPublishMessageStoreService, DupPubRelMessageStoreService dupPubRelMessageStoreService, SubscribeStoreService subscribeStoreService) {
        this.authService = authService;
        this.sessionStoreService = sessionStoreService;
        this.dupPublishMessageStoreService = dupPublishMessageStoreService;
        this.dupPubRelMessageStoreService = dupPubRelMessageStoreService;
        this.subscribeStoreService = subscribeStoreService;
    }

    @Override // com.swallowframe.core.pc.mqtt.protocol.IProtocolHandler
    public void handle(Channel channel, MqttMessage mqttMessage) {
        MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
        if (mqttConnectMessage.decoderResult().isFailure()) {
            Throwable cause = mqttConnectMessage.decoderResult().cause();
            if (cause instanceof MqttUnacceptableProtocolVersionException) {
                channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), (Object) null));
                channel.close();
                return;
            } else if (!(cause instanceof MqttIdentifierRejectedException)) {
                channel.close();
                return;
            } else {
                channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), (Object) null));
                channel.close();
                return;
            }
        }
        if (StrUtil.isBlank(mqttConnectMessage.payload().clientIdentifier())) {
            channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), (Object) null));
            channel.close();
            return;
        }
        if (!this.authService.checkValid(mqttConnectMessage.payload().userName(), mqttConnectMessage.payload().passwordInBytes() == null ? null : new String(mqttConnectMessage.payload().passwordInBytes(), CharsetUtil.UTF_8))) {
            channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), (Object) null));
            channel.close();
            return;
        }
        if (this.sessionStoreService.containsKey(mqttConnectMessage.payload().clientIdentifier())) {
            SessionStore sessionStore = this.sessionStoreService.get(mqttConnectMessage.payload().clientIdentifier());
            Channel channel2 = sessionStore.getChannel();
            if (Boolean.valueOf(sessionStore.isCleanSession()).booleanValue()) {
                this.sessionStoreService.remove(mqttConnectMessage.payload().clientIdentifier());
                this.subscribeStoreService.removeForClient(mqttConnectMessage.payload().clientIdentifier());
                this.dupPublishMessageStoreService.removeByClient(mqttConnectMessage.payload().clientIdentifier());
                this.dupPubRelMessageStoreService.removeByClient(mqttConnectMessage.payload().clientIdentifier());
            }
            channel2.close();
        }
        SessionStore sessionStore2 = new SessionStore(mqttConnectMessage.payload().clientIdentifier(), channel, mqttConnectMessage.variableHeader().isCleanSession(), null);
        if (mqttConnectMessage.variableHeader().isWillFlag()) {
            sessionStore2.setWillMessage((MqttPublishMessage) MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(mqttConnectMessage.variableHeader().willQos()), mqttConnectMessage.variableHeader().isWillRetain(), 0), new MqttPublishVariableHeader(mqttConnectMessage.payload().willTopic(), 0), Unpooled.buffer().writeBytes(mqttConnectMessage.payload().willMessageInBytes())));
        }
        if (mqttConnectMessage.variableHeader().keepAliveTimeSeconds() > 0) {
            if (channel.pipeline().names().contains("idle")) {
                channel.pipeline().remove("idle");
            }
            channel.pipeline().addFirst("idle", new IdleStateHandler(0, 0, Math.round(mqttConnectMessage.variableHeader().keepAliveTimeSeconds() * 1.5f)));
        }
        this.sessionStoreService.put(mqttConnectMessage.payload().clientIdentifier(), sessionStore2);
        channel.attr(AttributeKey.valueOf("clientId")).set(mqttConnectMessage.payload().clientIdentifier());
        channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, Boolean.valueOf(this.sessionStoreService.containsKey(mqttConnectMessage.payload().clientIdentifier()) && !mqttConnectMessage.variableHeader().isCleanSession()).booleanValue()), (Object) null));
        log.info("CONNECT - clientId: {}, cleanSession: {}", mqttConnectMessage.payload().clientIdentifier(), Boolean.valueOf(mqttConnectMessage.variableHeader().isCleanSession()));
        if (mqttConnectMessage.variableHeader().isCleanSession()) {
            return;
        }
        List<DupPublishMessageStore> list = this.dupPublishMessageStoreService.get(mqttConnectMessage.payload().clientIdentifier());
        List<DupPubRelMessageStore> list2 = this.dupPubRelMessageStoreService.get(mqttConnectMessage.payload().clientIdentifier());
        list.forEach(dupPublishMessageStore -> {
            channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, true, MqttQoS.valueOf(dupPublishMessageStore.getMqttQoS()), false, 0), new MqttPublishVariableHeader(dupPublishMessageStore.getTopic(), dupPublishMessageStore.getMessageId()), Unpooled.buffer().writeBytes(dupPublishMessageStore.getMessageBytes())));
        });
        list2.forEach(dupPubRelMessageStore -> {
            channel.writeAndFlush(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBREL, true, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(dupPubRelMessageStore.getMessageId()), (Object) null));
        });
    }
}
